Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
ohbitton committed Jul 29, 2024
1 parent b6f60cc commit 4d3ae60
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class ManagedStreamingIngestClient extends IngestClientBase implements Qu
private final ExponentialRetry exponentialRetryTemplate;
private CloseableHttpClient httpClient = null;
private ManagedStreamingQueuingPolicy queuingPolicy = ManagedStreamingQueuingPolicy.Default;
private static final String fallbackLogString ="Data size is greater than max streaming size according to the policy. Falling back to queued.";
private static final String fallbackLogString = "Data size is greater than max streaming size according to the policy. Falling back to queued.";

/**
* @param dmConnectionString dm connection string
* @return a new ManagedStreamingIngestClient
Expand Down Expand Up @@ -262,7 +263,7 @@ public ManagedStreamingIngestClient(ResourceManager resourceManager,
}

ManagedStreamingIngestClient(StreamingIngestClient streamingIngestClient, QueuedIngestClient queuedIngestClient, ExponentialRetry exponentialRetry) {
this.streamingIngestClient = streamingIngestClient;
this.streamingIngestClient = streamingIngestClient;
this.queuedIngestClient = queuedIngestClient;
exponentialRetryTemplate = exponentialRetry;
}
Expand Down Expand Up @@ -443,7 +444,8 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo
throw new IngestionClientException("Failed to read from stream.", e);
}

StreamSourceInfo managedSourceInfo = new StreamSourceInfo(byteArrayStream, true, sourceId, streamSourceInfo.getCompressionType(), streamSourceInfo.getRawSizeInBytes());
StreamSourceInfo managedSourceInfo = new StreamSourceInfo(byteArrayStream, true, sourceId, streamSourceInfo.getCompressionType(),
streamSourceInfo.getRawSizeInBytes());
try {
IngestionResult result = streamWithRetries(managedSourceInfo, ingestionProperties, null);
if (result != null) {
Expand All @@ -461,8 +463,8 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo
}

/*
Set the policy that handles the logic over which data size would the client choose to directly use queued ingestion
instead of trying streaming ingestion first.
* Set the policy that handles the logic over which data size would the client choose to directly use queued ingestion instead of trying streaming ingestion
* first.
*/
public void setQueuingPolicy(ManagedStreamingQueuingPolicy queuingPolicy) {
this.queuingPolicy = queuingPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static StreamSourceInfo fileToStream(FileSourceInfo fileSourceInfo, boole
} else {
// Raw
streamSourceInfo.setRawSizeInBytes(
(compression != null && format.isCompressible()) ? stream.available() : 0);
(compression != null && format.isCompressible()) ? stream.available() : 0);
}
} catch (IOException e) {
throw new IngestionClientException(ExceptionsUtils.getMessageEx(e), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ static void setUp() throws Exception {
managedStreamingIngestClient = new ManagedStreamingIngestClient(resourceManagerMock, azureStorageClientMock,
streamingClientMock);
queuedIngestClientMock = mock(QueuedIngestClientImpl.class);
managedStreamingIngestClientSpy = spy(new ManagedStreamingIngestClient(mock(StreamingIngestClient.class), queuedIngestClientMock, new ExponentialRetry(1)));
managedStreamingIngestClientSpy = spy(
new ManagedStreamingIngestClient(mock(StreamingIngestClient.class), queuedIngestClientMock, new ExponentialRetry(1)));
}

static ByteArrayInputStream createStreamOfSize(int size) throws UnsupportedEncodingException {
Expand All @@ -80,13 +81,13 @@ static int getStreamSize(InputStream inputStream) throws IOException {
}
return size;
}

@Test
void IngestFromStream_CsvStream() throws Exception {

InputStream inputStream = createStreamOfSize(1);
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);


// Expect to work and also choose no queuing
OperationStatus status = managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection()
.get(0).status;
Expand Down Expand Up @@ -162,9 +163,10 @@ void shouldUseQueueingPredicate_DefaultBehavior() {
}

@Test
void ManagedStreaming_BigFile_ShouldQueueTheFullStream() throws IOException, IngestionClientException, IngestionServiceException{
EmptyAvailableByteArrayOutputStream inputStream = new EmptyAvailableByteArrayOutputStream(createStreamOfSize(ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 10));
int size = inputStream.bb.available();
void ManagedStreaming_BigFile_ShouldQueueTheFullStream() throws IOException, IngestionClientException, IngestionServiceException {
EmptyAvailableByteArrayOutputStream inputStream = new EmptyAvailableByteArrayOutputStream(
createStreamOfSize(ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 10));
int size = inputStream.bb.available();
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
ArgumentCaptor<StreamSourceInfo> streamSourceInfoCaptor = ArgumentCaptor.forClass(StreamSourceInfo.class);

Expand All @@ -176,11 +178,10 @@ void ManagedStreaming_BigFile_ShouldQueueTheFullStream() throws IOException, Ing
Assertions.assertEquals(queuedStreamSize, size);
}

static class EmptyAvailableByteArrayOutputStream extends InputStream
{
static class EmptyAvailableByteArrayOutputStream extends InputStream {
private ByteArrayInputStream bb;

EmptyAvailableByteArrayOutputStream(ByteArrayInputStream bb){
EmptyAvailableByteArrayOutputStream(ByteArrayInputStream bb) {
this.bb = bb;
}

Expand Down

0 comments on commit 4d3ae60

Please sign in to comment.