From 8ea6b75efc45f5bd3f664aa69d0a13abb1d5a28e Mon Sep 17 00:00:00 2001 From: Pasqual Koschmieder Date: Mon, 1 Jul 2024 13:22:13 +0200 Subject: [PATCH] standardize file querying from a node --- checkstyle.xml | 2 +- .../cloudnetservice/common/io/FileUtil.java | 3 + .../chunk/ChunkedFileQueryBuilder.java | 122 ++++++++++++ .../network/chunk/ChunkedPacketHandler.java | 3 +- .../DefaultFileChunkedPacketHandler.java | 13 +- .../DefaultChunkedFileQueryBuilder.java | 176 ++++++++++++++++++ .../FileQueryChannelMessageListener.java | 100 ++++++++++ .../chunk/event/FileQueryRequestEvent.java | 86 +++++++++ .../defaults/RemoteTemplateStorage.java | 64 +++---- .../java/eu/cloudnetservice/node/Node.java | 2 + .../chunk/FileDeployCallbackListener.java | 102 ++++------ .../chunk/StaticServiceDeployCallback.java | 6 +- .../network/chunk/TemplateDeployCallback.java | 4 +- .../chunk/TemplateFileDeployCallback.java | 4 +- 14 files changed, 575 insertions(+), 112 deletions(-) create mode 100644 driver/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkedFileQueryBuilder.java create mode 100644 driver/src/main/java/eu/cloudnetservice/driver/network/chunk/defaults/builder/DefaultChunkedFileQueryBuilder.java create mode 100644 driver/src/main/java/eu/cloudnetservice/driver/network/chunk/event/FileQueryChannelMessageListener.java create mode 100644 driver/src/main/java/eu/cloudnetservice/driver/network/chunk/event/FileQueryRequestEvent.java diff --git a/checkstyle.xml b/checkstyle.xml index 70ad146f77..8e77c754ef 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -332,7 +332,7 @@ - + diff --git a/common/src/main/java/eu/cloudnetservice/common/io/FileUtil.java b/common/src/main/java/eu/cloudnetservice/common/io/FileUtil.java index 4b6156516f..a69b4bc392 100644 --- a/common/src/main/java/eu/cloudnetservice/common/io/FileUtil.java +++ b/common/src/main/java/eu/cloudnetservice/common/io/FileUtil.java @@ -63,6 +63,9 @@ public final class FileUtil { .filter(prov -> prov.getScheme().equalsIgnoreCase("jar")) .findFirst() .orElseThrow(() -> new ExceptionInInitializerError("Unable to find a file system provider supporting jars")); + + // pre-create the temporary directory + FileUtil.createDirectory(TEMP_DIR); } private FileUtil() { diff --git a/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkedFileQueryBuilder.java b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkedFileQueryBuilder.java new file mode 100644 index 0000000000..c5445b932e --- /dev/null +++ b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkedFileQueryBuilder.java @@ -0,0 +1,122 @@ +/* + * Copyright 2019-2024 CloudNetService team & contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.cloudnetservice.driver.network.chunk; + +import eu.cloudnetservice.driver.network.buffer.DataBuf; +import eu.cloudnetservice.driver.network.chunk.defaults.builder.DefaultChunkedFileQueryBuilder; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import lombok.NonNull; + +/** + * A builder that can be used to query a file from a remote network component. + * + * @since 4.0 + */ +public interface ChunkedFileQueryBuilder { + + /** + * Constructs a new default implementation of this builder. + * + * @return a new default implementation of this builder. + */ + static @NonNull ChunkedFileQueryBuilder create() { + return new DefaultChunkedFileQueryBuilder(); + } + + /** + * The size of the chunks in bytes that will be transferred from the remote to the caller. + * + * @param chunkSize the size of the chunks that should be transferred from the remote to the caller. + * @return this builder, for chaining. + * @throws IllegalArgumentException if the given chunk size is not positive. + */ + @NonNull + ChunkedFileQueryBuilder chunkSize(int chunkSize); + + /** + * The identifier for the data should be transferred from the remote to the caller. + * + * @param dataIdentifier the identifier of the data that should be transferred. + * @return this builder, for chaining. + * @throws NullPointerException if the given identifier is null. + * @throws IllegalArgumentException if the given identifier is empty. + */ + @NonNull + ChunkedFileQueryBuilder dataIdentifier(@NonNull String dataIdentifier); + + /** + * Sets that the file data should be requested from the node with the given id. Note: the source must be directly + * available to the caller, else the data will never reach the caller. + * + * @param nodeId the id of the node to request the data from. + * @return this builder, for chaining. + * @throws NullPointerException if the given node id is null. + */ + @NonNull + ChunkedFileQueryBuilder requestFromNode(@NonNull String nodeId); + + /** + * Sets that the file data should be requested from the service with the given name. Note: the source must be directly + * available to the caller, else the data will never reach the caller. + * + * @param serviceName the name of the service to request the data from. + * @return this builder, for chaining. + * @throws NullPointerException if the given service name is null. + */ + @NonNull + ChunkedFileQueryBuilder requestFromService(@NonNull String serviceName); + + /** + * A consumer that is called when the actual request is sent to the data source. The consumer can append additional + * information to the buffer that is relevant for the remote side to identify the data to transmit. + * + * @param bufferConfigurer the configurator for additional transmitted information about the request. + * @return this builder, for chaining. + * @throws NullPointerException if the given buffer configurator is null. + */ + @NonNull + ChunkedFileQueryBuilder configureMessageBuffer(@NonNull Consumer bufferConfigurer); + + /** + * Queries the data from the remote and completes with an input stream when the transmission was successful. + * + * @return a future completed with an input stream of the transmitted data. + */ + @NonNull + CompletableFuture query(); + + /** + * Queries the data from the remote and redirects it into a temporary file. The path to the file is wrapped in the + * returned future. + * + * @return a future completed with the path to which the transmitted contents were written. + */ + @NonNull + CompletableFuture queryToTempFile(); + + /** + * Queries the data from the remote and redirects it into the given file path. + * + * @param target the path to the file into which the response data should be written. + * @return a future completed with the path to which the transmitted contents were written. + */ + @NonNull + CompletableFuture queryToPath(@NonNull Path target); +} diff --git a/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkedPacketHandler.java b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkedPacketHandler.java index 0f357e937f..f12c7662a1 100644 --- a/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkedPacketHandler.java +++ b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkedPacketHandler.java @@ -60,9 +60,10 @@ interface Callback { * * @param information the information about the chunk session which were sent initially. * @param dataInput the stream of data sent to this component in this chunked session. + * @return true if the full data was consumed and the given stream can be closed, false otherwise. * @throws IOException in case an i/o exception happens during result handling. */ - void handleSessionComplete( + boolean handleSessionComplete( @NonNull ChunkSessionInformation information, @NonNull InputStream dataInput) throws IOException; } diff --git a/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/defaults/DefaultFileChunkedPacketHandler.java b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/defaults/DefaultFileChunkedPacketHandler.java index 10bf60f58f..b947083b81 100644 --- a/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/defaults/DefaultFileChunkedPacketHandler.java +++ b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/defaults/DefaultFileChunkedPacketHandler.java @@ -22,6 +22,7 @@ import eu.cloudnetservice.driver.network.chunk.TransferStatus; import eu.cloudnetservice.driver.network.chunk.data.ChunkSessionInformation; import java.io.IOException; +import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.file.Files; import java.nio.file.Path; @@ -138,8 +139,16 @@ public boolean handleChunkPart(int chunkPosition, @NonNull DataBuf dataBuf) { // call the write completion handler, if present if (this.writeCompleteHandler != null) { - try (var inputStream = Files.newInputStream(this.tempFilePath, StandardOpenOption.DELETE_ON_CLOSE)) { - this.writeCompleteHandler.handleSessionComplete(this.chunkSessionInformation, inputStream); + var closeStream = true; + var stream = InputStream.nullInputStream(); + try { + // open the stream to the data and post it to the write handler + stream = Files.newInputStream(this.tempFilePath, StandardOpenOption.DELETE_ON_CLOSE); + closeStream = this.writeCompleteHandler.handleSessionComplete(this.chunkSessionInformation, stream); + } finally { + if (closeStream) { + stream.close(); + } } } diff --git a/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/defaults/builder/DefaultChunkedFileQueryBuilder.java b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/defaults/builder/DefaultChunkedFileQueryBuilder.java new file mode 100644 index 0000000000..beeb3df8b0 --- /dev/null +++ b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/defaults/builder/DefaultChunkedFileQueryBuilder.java @@ -0,0 +1,176 @@ +/* + * Copyright 2019-2024 CloudNetService team & contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.cloudnetservice.driver.network.chunk.defaults.builder; + +import com.google.common.base.Preconditions; +import eu.cloudnetservice.common.io.FileUtil; +import eu.cloudnetservice.driver.channel.ChannelMessage; +import eu.cloudnetservice.driver.channel.ChannelMessageTarget; +import eu.cloudnetservice.driver.inject.InjectionLayer; +import eu.cloudnetservice.driver.network.buffer.DataBuf; +import eu.cloudnetservice.driver.network.buffer.DataBufFactory; +import eu.cloudnetservice.driver.network.chunk.ChunkedFileQueryBuilder; +import eu.cloudnetservice.driver.network.chunk.data.ChunkSessionInformation; +import eu.cloudnetservice.driver.network.chunk.defaults.ChunkedSessionRegistry; +import eu.cloudnetservice.driver.network.chunk.defaults.DefaultFileChunkedPacketHandler; +import eu.cloudnetservice.driver.network.def.NetworkConstants; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import lombok.NonNull; + +/** + * Default implementation of a builder for a file query from a remote network component. + * + * @since 4.0 + */ +public class DefaultChunkedFileQueryBuilder implements ChunkedFileQueryBuilder { + + private static final DataBuf EMPTY_BUFFER; + + static { + // buffer is never used, so we just allocate an empty buffer and release it immediately + EMPTY_BUFFER = DataBufFactory.defaultFactory().createWithExpectedSize(0); + EMPTY_BUFFER.release(); + } + + private String dataIdentifier; + private ChannelMessageTarget dataSource; + private Consumer messageBufferConfigurator; + private int chunkSize = DefaultChunkedPacketSenderBuilder.DEFAULT_CHUNK_SIZE; + + /** + * {@inheritDoc} + */ + @Override + public @NonNull ChunkedFileQueryBuilder chunkSize(int chunkSize) { + Preconditions.checkArgument(chunkSize > 0, "chunk size must be greater than 0"); + this.chunkSize = chunkSize; + return this; + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull ChunkedFileQueryBuilder dataIdentifier(@NonNull String dataIdentifier) { + Preconditions.checkArgument(!dataIdentifier.isBlank(), "data identifier cannot be empty"); + this.dataIdentifier = dataIdentifier; + return this; + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull ChunkedFileQueryBuilder requestFromNode(@NonNull String nodeId) { + this.dataSource = ChannelMessageTarget.of(ChannelMessageTarget.Type.NODE, nodeId); + return this; + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull ChunkedFileQueryBuilder requestFromService(@NonNull String serviceName) { + this.dataSource = ChannelMessageTarget.of(ChannelMessageTarget.Type.SERVICE, serviceName); + return this; + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull ChunkedFileQueryBuilder configureMessageBuffer(@NonNull Consumer bufferConfigurer) { + this.messageBufferConfigurator = bufferConfigurer; + return this; + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull CompletableFuture query() { + Preconditions.checkNotNull(this.dataIdentifier, "no data id provided"); + Preconditions.checkNotNull(this.dataSource, "no data source provided"); + + // configure the base message buffer + var sessionId = UUID.randomUUID(); + var queryBuffer = DataBuf.empty() + .writeInt(this.chunkSize) + .writeUniqueId(sessionId) + .writeString(this.dataIdentifier); + if (this.messageBufferConfigurator != null) { + this.messageBufferConfigurator.accept(queryBuffer); + } + + // register the session that is responsible for handling the response + var responseFuture = new CompletableFuture(); + var sessionRegistry = InjectionLayer.boot().instance(ChunkedSessionRegistry.class); + var sessionInfo = new ChunkSessionInformation(this.chunkSize, sessionId, "query:dummy", EMPTY_BUFFER); + var handler = new DefaultFileChunkedPacketHandler(sessionInfo, (_, stream) -> !responseFuture.complete(stream)); + sessionRegistry.registerSession(sessionId, handler); + + // send the request to transmit the data + var channelMessage = ChannelMessage.builder() + .channel(NetworkConstants.INTERNAL_MSG_CHANNEL) + .message("chunked_query_file") + .target(this.dataSource) + .buffer(queryBuffer) + .build(); + return channelMessage + .sendSingleQueryAsync() + .thenCompose(response -> { + var responseData = response.content(); + if (responseData.readBoolean()) { + // transfer started successfully + return responseFuture; + } else { + // transfer couldn't be started for some reason + throw new IllegalStateException("unable to start chunked data transfer"); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull CompletableFuture queryToTempFile() { + var tempFile = FileUtil.createTempFile(); + return this.queryToPath(tempFile); + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull CompletableFuture queryToPath(@NonNull Path target) { + return this.query().thenApply(stream -> { + try (stream) { + FileUtil.copy(stream, target); + return target; + } catch (IOException exception) { + throw new UncheckedIOException(exception); + } + }); + } +} diff --git a/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/event/FileQueryChannelMessageListener.java b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/event/FileQueryChannelMessageListener.java new file mode 100644 index 0000000000..9ffe58df03 --- /dev/null +++ b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/event/FileQueryChannelMessageListener.java @@ -0,0 +1,100 @@ +/* + * Copyright 2019-2024 CloudNetService team & contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.cloudnetservice.driver.network.chunk.event; + +import eu.cloudnetservice.driver.event.EventListener; +import eu.cloudnetservice.driver.event.EventManager; +import eu.cloudnetservice.driver.event.events.channel.ChannelMessageReceiveEvent; +import eu.cloudnetservice.driver.network.buffer.DataBuf; +import eu.cloudnetservice.driver.network.buffer.DataBufFactory; +import eu.cloudnetservice.driver.network.def.NetworkConstants; +import jakarta.inject.Inject; +import lombok.NonNull; + +/** + * A listener for channel messages that request a query transfer of a file. + * + * @since 4.0 + */ +public final class FileQueryChannelMessageListener { + + private final EventManager eventManager; + + /** + * Constructs a new query channel message listener instance. + * + * @param eventManager the event handler to use for posting request events. + * @throws NullPointerException if the given event manager is null. + */ + @Inject + public FileQueryChannelMessageListener(@NonNull EventManager eventManager) { + this.eventManager = eventManager; + } + + /** + * Constructs the response data for a request. The information contains the single information if the transfer + * started. + * + * @param transferStarted true if a handler was found and the transfer was started, false otherwise. + * @return the response data for the remote network component. + */ + private static @NonNull DataBuf constructRequestResponse(boolean transferStarted) { + return DataBufFactory.defaultFactory().createWithExpectedSize(1).writeBoolean(transferStarted); + } + + /** + * Handles chunked file query requests that come in via channel message. + * + * @param event the channel message receive event for any channel message. + * @throws NullPointerException if the given event is null. + */ + @EventListener + public void handleFileQueryRequest(@NonNull ChannelMessageReceiveEvent event) { + var channel = event.networkChannel(); + var request = event.channelMessage(); + if (!event.query() + || !request.channel().equals(NetworkConstants.INTERNAL_MSG_CHANNEL) + || !request.message().equals("chunked_query_file")) { + return; + } + + // read the request data & try to resolve a handler for the request + var requestData = request.content(); + var chunkSize = requestData.readInt(); + var chunkedSessionId = requestData.readUniqueId(); + var requestedDataId = requestData.readString(); + + var requestEvent = this.eventManager.callEvent(new FileQueryRequestEvent(requestedDataId, requestData)); + var responseHandlerBuilder = requestEvent.responseHandler(); + if (responseHandlerBuilder == null) { + // no handler for the request provided + var responseData = constructRequestResponse(false); + event.binaryResponse(responseData); + } else { + // finish the response handler construct, start the transfer & respond with a success response + responseHandlerBuilder + .toChannels(channel) + .chunkSize(chunkSize) + .transferChannel("query:dummy") + .sessionUniqueId(chunkedSessionId) + .build() + .transferChunkedData(); + var responseData = constructRequestResponse(true); + event.binaryResponse(responseData); + } + } +} diff --git a/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/event/FileQueryRequestEvent.java b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/event/FileQueryRequestEvent.java new file mode 100644 index 0000000000..3069b5141e --- /dev/null +++ b/driver/src/main/java/eu/cloudnetservice/driver/network/chunk/event/FileQueryRequestEvent.java @@ -0,0 +1,86 @@ +/* + * Copyright 2019-2024 CloudNetService team & contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.cloudnetservice.driver.network.chunk.event; + +import eu.cloudnetservice.driver.event.Event; +import eu.cloudnetservice.driver.network.buffer.DataBuf; +import eu.cloudnetservice.driver.network.chunk.ChunkedPacketSender; +import lombok.NonNull; +import org.jetbrains.annotations.Nullable; + +/** + * An event called when a request for a chunked file transmission is received to determine the handler for the chunked + * transmission. If no handler gets set by the register event listeners, the request gets rejected. + * + * @since 4.0 + */ +public final class FileQueryRequestEvent extends Event { + + private final String dataId; + private final DataBuf requestData; + + private ChunkedPacketSender.Builder responseHandler; + + /** + * Constructs a new file query request event. + * + * @param dataId the id of the data that is requested by the remote. + * @param requestData the full request data, possibly containing further information about the requested data. + * @throws NullPointerException if the given data id or request data is null. + */ + public FileQueryRequestEvent(@NonNull String dataId, @NonNull DataBuf requestData) { + this.dataId = dataId; + this.requestData = requestData; + } + + /** + * Get the id of the requested data. + * + * @return the id of the requested data. + */ + public @NonNull String dataId() { + return this.dataId; + } + + /** + * Get the full request content, possibly containing further information about the requested data. + * + * @return the full request content. + */ + public @NonNull DataBuf requestData() { + return this.requestData; + } + + /** + * Get the response handler builder that is responsible for responding with the requested data. Can be null if no + * event listener set a response handler yet. + * + * @return the response handler builder that is responsible for responding with the requested data. + */ + public @Nullable ChunkedPacketSender.Builder responseHandler() { + return this.responseHandler; + } + + /** + * Sets the response handler to use for the request. + * + * @param responseHandler the response handler to use for the request. + */ + public void responseHandler(@Nullable ChunkedPacketSender.Builder responseHandler) { + this.responseHandler = responseHandler; + } +} diff --git a/driver/src/main/java/eu/cloudnetservice/driver/template/defaults/RemoteTemplateStorage.java b/driver/src/main/java/eu/cloudnetservice/driver/template/defaults/RemoteTemplateStorage.java index bd7fcd3652..fb1cbcf2e9 100644 --- a/driver/src/main/java/eu/cloudnetservice/driver/template/defaults/RemoteTemplateStorage.java +++ b/driver/src/main/java/eu/cloudnetservice/driver/template/defaults/RemoteTemplateStorage.java @@ -20,12 +20,11 @@ import eu.cloudnetservice.common.io.ListenableOutputStream; import eu.cloudnetservice.common.io.ZipUtil; import eu.cloudnetservice.driver.ComponentInfo; -import eu.cloudnetservice.driver.channel.ChannelMessage; import eu.cloudnetservice.driver.network.NetworkClient; import eu.cloudnetservice.driver.network.buffer.DataBuf; +import eu.cloudnetservice.driver.network.chunk.ChunkedFileQueryBuilder; import eu.cloudnetservice.driver.network.chunk.ChunkedPacketSender; import eu.cloudnetservice.driver.network.chunk.TransferStatus; -import eu.cloudnetservice.driver.network.def.NetworkConstants; import eu.cloudnetservice.driver.network.rpc.annotation.RPCInvocationTarget; import eu.cloudnetservice.driver.service.ServiceTemplate; import eu.cloudnetservice.driver.template.TemplateStorage; @@ -34,9 +33,6 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import lombok.NonNull; import org.jetbrains.annotations.Nullable; @@ -107,29 +103,21 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu .toChannels(this.networkClient.firstChannel()) .build() .transferChunkedData() - .get(5, TimeUnit.MINUTES, TransferStatus.FAILURE) == TransferStatus.SUCCESS; + .thenApply(status -> status == TransferStatus.SUCCESS) + .join(); } /** * {@inheritDoc} */ @Override - public @Nullable InputStream zipTemplate(@NonNull ServiceTemplate template) throws IOException { - // send a request for the template to the node - var responseId = UUID.randomUUID(); - var response = ChannelMessage.builder() - .message("remote_templates_zip_template") - .channel(NetworkConstants.INTERNAL_MSG_CHANNEL) - .targetNode(this.componentInfo.nodeUniqueId()) - .buffer(DataBuf.empty().writeString(this.name).writeObject(template).writeUniqueId(responseId)) - .build() - .sendSingleQuery(); - // check if we got a response - if (response == null || !response.content().readBoolean()) { - return null; - } - // the file is transferred and should be readable - return Files.newInputStream(FileUtil.TEMP_DIR.resolve(responseId.toString()), StandardOpenOption.DELETE_ON_CLOSE); + public @Nullable InputStream zipTemplate(@NonNull ServiceTemplate template) { + return ChunkedFileQueryBuilder.create() + .dataIdentifier("remote_templates_zip_template") + .requestFromNode(this.componentInfo.nodeUniqueId()) + .configureMessageBuffer(buffer -> buffer.writeString(this.name).writeObject(template)) + .query() + .join(); } /** @@ -174,15 +162,18 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu ) throws IOException { return new ListenableOutputStream<>( Files.newOutputStream(localPath), - $ -> ChunkedPacketSender.forFileTransfer() + _ -> ChunkedPacketSender.forFileTransfer() .forFile(localPath) .transferChannel("deploy_single_file") .toChannels(this.networkClient.firstChannel()) - .withExtraData( - DataBuf.empty().writeString(this.name).writeObject(template).writeString(path).writeBoolean(append)) + .withExtraData(DataBuf.empty() + .writeString(this.name) + .writeObject(template) + .writeString(path) + .writeBoolean(append)) .build() .transferChunkedData() - .get(5, TimeUnit.MINUTES, null)); + .join()); } /** @@ -193,20 +184,11 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu @NonNull ServiceTemplate template, @NonNull String path ) throws IOException { - // send a request for the file to the node - var responseId = UUID.randomUUID(); - var response = ChannelMessage.builder() - .message("remote_templates_template_file") - .channel(NetworkConstants.INTERNAL_MSG_CHANNEL) - .targetNode(this.componentInfo.nodeUniqueId()) - .buffer(DataBuf.empty().writeString(path).writeString(this.name).writeObject(template).writeUniqueId(responseId)) - .build() - .sendSingleQuery(); - // check if we got a response - if (response == null || !response.content().readBoolean()) { - return null; - } - // the file is transferred and should be readable - return Files.newInputStream(FileUtil.TEMP_DIR.resolve(responseId.toString()), StandardOpenOption.DELETE_ON_CLOSE); + return ChunkedFileQueryBuilder.create() + .dataIdentifier("remote_templates_template_file") + .requestFromNode(this.componentInfo.nodeUniqueId()) + .configureMessageBuffer(buffer -> buffer.writeString(this.name).writeObject(template).writeString(path)) + .query() + .join(); } } diff --git a/node/src/main/java/eu/cloudnetservice/node/Node.java b/node/src/main/java/eu/cloudnetservice/node/Node.java index 4f102c4a65..5730a819d0 100644 --- a/node/src/main/java/eu/cloudnetservice/node/Node.java +++ b/node/src/main/java/eu/cloudnetservice/node/Node.java @@ -35,6 +35,7 @@ import eu.cloudnetservice.driver.module.DefaultModuleDependencyLoader; import eu.cloudnetservice.driver.module.ModuleProvider; import eu.cloudnetservice.driver.network.NetworkServer; +import eu.cloudnetservice.driver.network.chunk.event.FileQueryChannelMessageListener; import eu.cloudnetservice.driver.network.def.NetworkConstants; import eu.cloudnetservice.driver.network.netty.NettyUtil; import eu.cloudnetservice.driver.network.rpc.factory.RPCFactory; @@ -454,6 +455,7 @@ private void finishStartup( // register listeners & post node startup finish eventManager.registerListener(callbackListener); eventManager.callEvent(new CloudNetNodePostInitializationEvent()); + eventManager.registerListener(FileQueryChannelMessageListener.class); // notify that we are done & start the main tick loop LOGGER.info(I18n.trans("start-done", Duration.between(startInstant, Instant.now()).toMillis())); diff --git a/node/src/main/java/eu/cloudnetservice/node/network/chunk/FileDeployCallbackListener.java b/node/src/main/java/eu/cloudnetservice/node/network/chunk/FileDeployCallbackListener.java index 8742bbbeaf..26a4821c2b 100644 --- a/node/src/main/java/eu/cloudnetservice/node/network/chunk/FileDeployCallbackListener.java +++ b/node/src/main/java/eu/cloudnetservice/node/network/chunk/FileDeployCallbackListener.java @@ -17,21 +17,18 @@ package eu.cloudnetservice.node.network.chunk; import eu.cloudnetservice.driver.event.EventListener; -import eu.cloudnetservice.driver.event.events.channel.ChannelMessageReceiveEvent; import eu.cloudnetservice.driver.event.events.chunk.ChunkedPacketSessionOpenEvent; import eu.cloudnetservice.driver.network.buffer.DataBuf; import eu.cloudnetservice.driver.network.chunk.ChunkedPacketSender; -import eu.cloudnetservice.driver.network.chunk.TransferStatus; import eu.cloudnetservice.driver.network.chunk.defaults.DefaultFileChunkedPacketHandler; -import eu.cloudnetservice.driver.network.def.NetworkConstants; +import eu.cloudnetservice.driver.network.chunk.event.FileQueryRequestEvent; import eu.cloudnetservice.driver.service.ServiceTemplate; import eu.cloudnetservice.driver.template.TemplateStorage; import eu.cloudnetservice.driver.template.TemplateStorageProvider; -import io.vavr.CheckedFunction2; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import java.io.InputStream; -import java.util.concurrent.TimeUnit; +import java.io.IOException; +import java.util.function.BiConsumer; import lombok.NonNull; @Singleton @@ -44,7 +41,7 @@ public final class FileDeployCallbackListener { private final TemplateStorageProvider templateStorageProvider; @Inject - public FileDeployCallbackListener( + FileDeployCallbackListener( @NonNull TemplateDeployCallback templateDeployCallback, @NonNull StaticServiceDeployCallback serviceDeployCallback, @NonNull TemplateFileDeployCallback templateFileDeployCallback, @@ -58,74 +55,53 @@ public FileDeployCallbackListener( @EventListener public void handle(@NonNull ChunkedPacketSessionOpenEvent event) { - switch (event.session().transferChannel()) { - case "deploy_service_template" -> event.handler(new DefaultFileChunkedPacketHandler( - event.session(), - this.templateDeployCallback)); - case "deploy_single_file" -> event.handler(new DefaultFileChunkedPacketHandler( - event.session(), - this.templateFileDeployCallback)); - case "deploy_static_service" -> event.handler(new DefaultFileChunkedPacketHandler( - event.session(), - this.serviceDeployCallback)); - default -> { - } + var callback = switch (event.session().transferChannel()) { + case "deploy_static_service" -> this.serviceDeployCallback; + case "deploy_single_file" -> this.templateFileDeployCallback; + case "deploy_service_template" -> this.templateDeployCallback; + default -> null; + }; + if (callback != null) { + event.handler(new DefaultFileChunkedPacketHandler(event.session(), callback)); } } @EventListener - public void handle(@NonNull ChannelMessageReceiveEvent event) { - if (event.channel().equals(NetworkConstants.INTERNAL_MSG_CHANNEL)) { - switch (event.message()) { - case "remote_templates_zip_template" -> this.handleInputRequest(event, TemplateStorage::zipTemplate); - case "remote_templates_template_file" -> { - // read the path info first - var path = event.content().readString(); - this.handleInputRequest(event, (storage, template) -> storage.newInputStream(template, path)); + public void handle(@NonNull FileQueryRequestEvent event) { + switch (event.dataId()) { + case "remote_templates_zip_template" -> this.handleInputRequest(event.requestData(), (storage, template) -> { + try { + var zipInputStream = storage.zipTemplate(template); + if (zipInputStream != null) { + var responseHandler = ChunkedPacketSender.forFileTransfer().source(zipInputStream); + event.responseHandler(responseHandler); + } + } catch (IOException _) { } - default -> { + }); + case "remote_templates_template_file" -> this.handleInputRequest(event.requestData(), (storage, template) -> { + try { + var filePath = event.requestData().readString(); + var fileInputStream = storage.newInputStream(template, filePath); + if (fileInputStream != null) { + var responseHandler = ChunkedPacketSender.forFileTransfer().source(fileInputStream); + event.responseHandler(responseHandler); + } + } catch (IOException _) { } - } + }); } } private void handleInputRequest( - @NonNull ChannelMessageReceiveEvent event, - @NonNull CheckedFunction2 streamOpener + @NonNull DataBuf requestData, + @NonNull BiConsumer handler ) { - // read the information - var storageName = event.content().readString(); - var template = event.content().readObject(ServiceTemplate.class); - var responseId = event.content().readUniqueId(); - - // get the storage + var storageName = requestData.readString(); + var template = requestData.readObject(ServiceTemplate.class); var storage = this.templateStorageProvider.templateStorage(storageName); - if (storage == null) { - // missing storage - no result - event.binaryResponse(DataBuf.empty().writeBoolean(false)); - return; - } - - // zip the template and return to the sender - try (var zip = streamOpener.apply(storage, template)) { - // check if the template exists - if (zip == null) { - event.binaryResponse(DataBuf.empty().writeBoolean(false)); - return; - } - - // send the zip to the requesting component - var status = ChunkedPacketSender.forFileTransfer() - .source(zip) - .sessionUniqueId(responseId) - .toChannels(event.networkChannel()) - .transferChannel("request_template_file_result") - .build() - .transferChunkedData() - .get(5, TimeUnit.MINUTES, TransferStatus.FAILURE); - event.binaryResponse(DataBuf.empty().writeBoolean(status == TransferStatus.SUCCESS)); - } catch (Throwable exception) { - event.binaryResponse(DataBuf.empty().writeBoolean(false)); + if (storage != null && storage.contains(template)) { + handler.accept(storage, template); } } } diff --git a/node/src/main/java/eu/cloudnetservice/node/network/chunk/StaticServiceDeployCallback.java b/node/src/main/java/eu/cloudnetservice/node/network/chunk/StaticServiceDeployCallback.java index 5a45cfe6fc..e609c69a5e 100644 --- a/node/src/main/java/eu/cloudnetservice/node/network/chunk/StaticServiceDeployCallback.java +++ b/node/src/main/java/eu/cloudnetservice/node/network/chunk/StaticServiceDeployCallback.java @@ -43,7 +43,7 @@ public StaticServiceDeployCallback(@NonNull CloudServiceManager cloudServiceMana } @Override - public void handleSessionComplete( + public boolean handleSessionComplete( @NonNull ChunkSessionInformation information, @NonNull InputStream dataInput ) { @@ -57,7 +57,7 @@ public void handleSessionComplete( // check if the service path exists, and we can overwrite it if (Files.exists(servicePath) && !overwriteService) { LOGGER.severe(I18n.trans("command-cluster-push-static-service-existing", service)); - return; + return true; } // delete the old contents @@ -70,5 +70,7 @@ public void handleSessionComplete( } else { LOGGER.severe(I18n.trans("command-cluster-push-static-service-running-remote", service)); } + + return true; } } diff --git a/node/src/main/java/eu/cloudnetservice/node/network/chunk/TemplateDeployCallback.java b/node/src/main/java/eu/cloudnetservice/node/network/chunk/TemplateDeployCallback.java index b97a93ca09..8bcd52cd91 100644 --- a/node/src/main/java/eu/cloudnetservice/node/network/chunk/TemplateDeployCallback.java +++ b/node/src/main/java/eu/cloudnetservice/node/network/chunk/TemplateDeployCallback.java @@ -42,7 +42,7 @@ public TemplateDeployCallback( } @Override - public void handleSessionComplete( + public boolean handleSessionComplete( @NonNull ChunkSessionInformation information, @NonNull InputStream dataInput ) { @@ -68,5 +68,7 @@ public void handleSessionComplete( this.mainThread.resume(); } } + + return true; } } diff --git a/node/src/main/java/eu/cloudnetservice/node/network/chunk/TemplateFileDeployCallback.java b/node/src/main/java/eu/cloudnetservice/node/network/chunk/TemplateFileDeployCallback.java index 556fb7b3c0..96413a82f1 100644 --- a/node/src/main/java/eu/cloudnetservice/node/network/chunk/TemplateFileDeployCallback.java +++ b/node/src/main/java/eu/cloudnetservice/node/network/chunk/TemplateFileDeployCallback.java @@ -38,7 +38,7 @@ public TemplateFileDeployCallback(@NonNull TemplateStorageProvider templateStora } @Override - public void handleSessionComplete( + public boolean handleSessionComplete( @NonNull ChunkSessionInformation information, @NonNull InputStream dataInput ) throws IOException { @@ -59,5 +59,7 @@ public void handleSessionComplete( FileUtil.copy(dataInput, out); } } + + return true; } }