Skip to content

Commit

Permalink
standardize file querying from a node
Browse files Browse the repository at this point in the history
  • Loading branch information
derklaro committed Jul 1, 2024
1 parent 76ccd8a commit 8ea6b75
Show file tree
Hide file tree
Showing 14 changed files with 575 additions and 112 deletions.
2 changes: 1 addition & 1 deletion checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
</module>
<module name="SingleLineJavadoc"/>
<module name="EmptyCatchBlock">
<property name="exceptionVariableName" value="ignored|expected"/>
<property name="exceptionVariableName" value="ignored|expected|_"/>
</module>
<module name="CommentsIndentation">
<property name="tokens" value="SINGLE_LINE_COMMENT, BLOCK_COMMENT_BEGIN"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public final class FileUtil {
.filter(prov -> prov.getScheme().equalsIgnoreCase("jar"))
.findFirst()
.orElseThrow(() -> new ExceptionInInitializerError("Unable to find a file system provider supporting jars"));

// pre-create the temporary directory
FileUtil.createDirectory(TEMP_DIR);
}

private FileUtil() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2019-2024 CloudNetService team & contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package eu.cloudnetservice.driver.network.chunk;

import eu.cloudnetservice.driver.network.buffer.DataBuf;
import eu.cloudnetservice.driver.network.chunk.defaults.builder.DefaultChunkedFileQueryBuilder;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import lombok.NonNull;

/**
* A builder that can be used to query a file from a remote network component.
*
* @since 4.0
*/
public interface ChunkedFileQueryBuilder {

/**
* Constructs a new default implementation of this builder.
*
* @return a new default implementation of this builder.
*/
static @NonNull ChunkedFileQueryBuilder create() {
return new DefaultChunkedFileQueryBuilder();
}

/**
* The size of the chunks in bytes that will be transferred from the remote to the caller.
*
* @param chunkSize the size of the chunks that should be transferred from the remote to the caller.
* @return this builder, for chaining.
* @throws IllegalArgumentException if the given chunk size is not positive.
*/
@NonNull
ChunkedFileQueryBuilder chunkSize(int chunkSize);

/**
* The identifier for the data should be transferred from the remote to the caller.
*
* @param dataIdentifier the identifier of the data that should be transferred.
* @return this builder, for chaining.
* @throws NullPointerException if the given identifier is null.
* @throws IllegalArgumentException if the given identifier is empty.
*/
@NonNull
ChunkedFileQueryBuilder dataIdentifier(@NonNull String dataIdentifier);

/**
* Sets that the file data should be requested from the node with the given id. Note: the source must be directly
* available to the caller, else the data will never reach the caller.
*
* @param nodeId the id of the node to request the data from.
* @return this builder, for chaining.
* @throws NullPointerException if the given node id is null.
*/
@NonNull
ChunkedFileQueryBuilder requestFromNode(@NonNull String nodeId);

/**
* Sets that the file data should be requested from the service with the given name. Note: the source must be directly
* available to the caller, else the data will never reach the caller.
*
* @param serviceName the name of the service to request the data from.
* @return this builder, for chaining.
* @throws NullPointerException if the given service name is null.
*/
@NonNull
ChunkedFileQueryBuilder requestFromService(@NonNull String serviceName);

/**
* A consumer that is called when the actual request is sent to the data source. The consumer can append additional
* information to the buffer that is relevant for the remote side to identify the data to transmit.
*
* @param bufferConfigurer the configurator for additional transmitted information about the request.
* @return this builder, for chaining.
* @throws NullPointerException if the given buffer configurator is null.
*/
@NonNull
ChunkedFileQueryBuilder configureMessageBuffer(@NonNull Consumer<DataBuf.Mutable> bufferConfigurer);

/**
* Queries the data from the remote and completes with an input stream when the transmission was successful.
*
* @return a future completed with an input stream of the transmitted data.
*/
@NonNull
CompletableFuture<InputStream> query();

/**
* Queries the data from the remote and redirects it into a temporary file. The path to the file is wrapped in the
* returned future.
*
* @return a future completed with the path to which the transmitted contents were written.
*/
@NonNull
CompletableFuture<Path> queryToTempFile();

/**
* Queries the data from the remote and redirects it into the given file path.
*
* @param target the path to the file into which the response data should be written.
* @return a future completed with the path to which the transmitted contents were written.
*/
@NonNull
CompletableFuture<Path> queryToPath(@NonNull Path target);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ interface Callback {
*
* @param information the information about the chunk session which were sent initially.
* @param dataInput the stream of data sent to this component in this chunked session.
* @return true if the full data was consumed and the given stream can be closed, false otherwise.
* @throws IOException in case an i/o exception happens during result handling.
*/
void handleSessionComplete(
boolean handleSessionComplete(
@NonNull ChunkSessionInformation information,
@NonNull InputStream dataInput) throws IOException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import eu.cloudnetservice.driver.network.chunk.TransferStatus;
import eu.cloudnetservice.driver.network.chunk.data.ChunkSessionInformation;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -138,8 +139,16 @@ public boolean handleChunkPart(int chunkPosition, @NonNull DataBuf dataBuf) {

// call the write completion handler, if present
if (this.writeCompleteHandler != null) {
try (var inputStream = Files.newInputStream(this.tempFilePath, StandardOpenOption.DELETE_ON_CLOSE)) {
this.writeCompleteHandler.handleSessionComplete(this.chunkSessionInformation, inputStream);
var closeStream = true;
var stream = InputStream.nullInputStream();
try {
// open the stream to the data and post it to the write handler
stream = Files.newInputStream(this.tempFilePath, StandardOpenOption.DELETE_ON_CLOSE);
closeStream = this.writeCompleteHandler.handleSessionComplete(this.chunkSessionInformation, stream);
} finally {
if (closeStream) {
stream.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright 2019-2024 CloudNetService team & contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package eu.cloudnetservice.driver.network.chunk.defaults.builder;

import com.google.common.base.Preconditions;
import eu.cloudnetservice.common.io.FileUtil;
import eu.cloudnetservice.driver.channel.ChannelMessage;
import eu.cloudnetservice.driver.channel.ChannelMessageTarget;
import eu.cloudnetservice.driver.inject.InjectionLayer;
import eu.cloudnetservice.driver.network.buffer.DataBuf;
import eu.cloudnetservice.driver.network.buffer.DataBufFactory;
import eu.cloudnetservice.driver.network.chunk.ChunkedFileQueryBuilder;
import eu.cloudnetservice.driver.network.chunk.data.ChunkSessionInformation;
import eu.cloudnetservice.driver.network.chunk.defaults.ChunkedSessionRegistry;
import eu.cloudnetservice.driver.network.chunk.defaults.DefaultFileChunkedPacketHandler;
import eu.cloudnetservice.driver.network.def.NetworkConstants;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import lombok.NonNull;

/**
* Default implementation of a builder for a file query from a remote network component.
*
* @since 4.0
*/
public class DefaultChunkedFileQueryBuilder implements ChunkedFileQueryBuilder {

private static final DataBuf EMPTY_BUFFER;

static {
// buffer is never used, so we just allocate an empty buffer and release it immediately
EMPTY_BUFFER = DataBufFactory.defaultFactory().createWithExpectedSize(0);
EMPTY_BUFFER.release();
}

private String dataIdentifier;
private ChannelMessageTarget dataSource;
private Consumer<DataBuf.Mutable> messageBufferConfigurator;
private int chunkSize = DefaultChunkedPacketSenderBuilder.DEFAULT_CHUNK_SIZE;

/**
* {@inheritDoc}
*/
@Override
public @NonNull ChunkedFileQueryBuilder chunkSize(int chunkSize) {
Preconditions.checkArgument(chunkSize > 0, "chunk size must be greater than 0");
this.chunkSize = chunkSize;
return this;
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull ChunkedFileQueryBuilder dataIdentifier(@NonNull String dataIdentifier) {
Preconditions.checkArgument(!dataIdentifier.isBlank(), "data identifier cannot be empty");
this.dataIdentifier = dataIdentifier;
return this;
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull ChunkedFileQueryBuilder requestFromNode(@NonNull String nodeId) {
this.dataSource = ChannelMessageTarget.of(ChannelMessageTarget.Type.NODE, nodeId);
return this;
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull ChunkedFileQueryBuilder requestFromService(@NonNull String serviceName) {
this.dataSource = ChannelMessageTarget.of(ChannelMessageTarget.Type.SERVICE, serviceName);
return this;
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull ChunkedFileQueryBuilder configureMessageBuffer(@NonNull Consumer<DataBuf.Mutable> bufferConfigurer) {
this.messageBufferConfigurator = bufferConfigurer;
return this;
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull CompletableFuture<InputStream> query() {
Preconditions.checkNotNull(this.dataIdentifier, "no data id provided");
Preconditions.checkNotNull(this.dataSource, "no data source provided");

// configure the base message buffer
var sessionId = UUID.randomUUID();
var queryBuffer = DataBuf.empty()
.writeInt(this.chunkSize)
.writeUniqueId(sessionId)
.writeString(this.dataIdentifier);
if (this.messageBufferConfigurator != null) {
this.messageBufferConfigurator.accept(queryBuffer);
}

// register the session that is responsible for handling the response
var responseFuture = new CompletableFuture<InputStream>();
var sessionRegistry = InjectionLayer.boot().instance(ChunkedSessionRegistry.class);
var sessionInfo = new ChunkSessionInformation(this.chunkSize, sessionId, "query:dummy", EMPTY_BUFFER);
var handler = new DefaultFileChunkedPacketHandler(sessionInfo, (_, stream) -> !responseFuture.complete(stream));
sessionRegistry.registerSession(sessionId, handler);

// send the request to transmit the data
var channelMessage = ChannelMessage.builder()
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
.message("chunked_query_file")
.target(this.dataSource)
.buffer(queryBuffer)
.build();
return channelMessage
.sendSingleQueryAsync()
.thenCompose(response -> {
var responseData = response.content();
if (responseData.readBoolean()) {
// transfer started successfully
return responseFuture;
} else {
// transfer couldn't be started for some reason
throw new IllegalStateException("unable to start chunked data transfer");
}
});
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull CompletableFuture<Path> queryToTempFile() {
var tempFile = FileUtil.createTempFile();
return this.queryToPath(tempFile);
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull CompletableFuture<Path> queryToPath(@NonNull Path target) {
return this.query().thenApply(stream -> {
try (stream) {
FileUtil.copy(stream, target);
return target;
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}
});
}
}
Loading

0 comments on commit 8ea6b75

Please sign in to comment.