Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract the native component without copying it into memory #198

Merged
merged 4 commits into from
Apr 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions java/amazon-kinesis-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@
<artifactId>protobuf-java</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.13</version>
<optional>true</optional>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand All @@ -109,13 +102,29 @@
<artifactId>confluex-mock-http</artifactId>
<version>0.4.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.2.22</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.7</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ private void deletePipes() {
}

private void startChildProcess() throws IOException, InterruptedException {
log.info("Asking for trace");
List<String> args = new ArrayList<>(Arrays.asList(pathToExecutable, "-o", outPipe.getAbsolutePath(), "-i",
inPipe.getAbsolutePath(), "-c", protobufToHex(config.toProtobufMessage()), "-k",
protobufToHex(makeSetCredentialsMessage(config.getCredentialsProvider(), false)), "-t"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.producer;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileLock;
import java.security.DigestInputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Arrays;

import javax.xml.bind.DatatypeConverter;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HashedFileCopier {
private static final Logger log = LoggerFactory.getLogger(HashedFileCopier.class);

static final String MESSAGE_DIGEST_ALGORITHM = "SHA-1";
static final String TEMP_PREFIX = "kpl";
static final String TEMP_SUFFIX = ".tmp";
static final String LOCK_SUFFIX = ".lock";

public static File copyFileFrom(InputStream sourceData, File destinationDirectory, String fileNameFormat)
throws Exception {
File tempFile = null;
try {
tempFile = File.createTempFile(TEMP_PREFIX, TEMP_SUFFIX, destinationDirectory);
log.debug("Extracting file with format {}", fileNameFormat);
FileOutputStream fileOutputStream = new FileOutputStream(tempFile);

DigestOutputStream digestOutputStream = new DigestOutputStream(fileOutputStream,
MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM));
IOUtils.copy(sourceData, digestOutputStream);
digestOutputStream.close();
byte[] digest = digestOutputStream.getMessageDigest().digest();
log.debug("Calculated digest of new file: {}", Arrays.toString(digest));
String digestHex = DatatypeConverter.printHexBinary(digest);
File finalFile = new File(destinationDirectory, String.format(fileNameFormat, digestHex));
File lockFile = new File(destinationDirectory, String.format(fileNameFormat + LOCK_SUFFIX, digestHex));
log.debug("Preparing to check and copy {} to {}", tempFile.getAbsolutePath(), finalFile.getAbsolutePath());
try (FileOutputStream lockFOS = new FileOutputStream(lockFile);
FileLock lock = lockFOS.getChannel().lock()) {
if (finalFile.exists() && finalFile.length() == tempFile.length()) {
byte[] existingFileDigest = null;
try (DigestInputStream digestInputStream = new DigestInputStream(new FileInputStream(finalFile),
MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM))) {
byte[] discardedBytes = new byte[8192];
while (digestInputStream.read(discardedBytes) != -1) {
//
// This is just used for the side affect of the digest input stream
//
}
existingFileDigest = digestInputStream.getMessageDigest().digest();
}
if (Arrays.equals(digest, existingFileDigest)) {
//
// The existing file matches the expected file, it's ok to just drop out now
//
log.info("'{}' already exists, and matches. Not overwriting.", finalFile.getAbsolutePath());
return finalFile;
}
log.warn(
"Detected a mismatch between the existing file, and the new file. "
+ "Will overwrite the existing file. " + "Existing: {} -- New File: {}",
Arrays.toString(existingFileDigest), Arrays.toString(digest));
}

if (!tempFile.renameTo(finalFile)) {
log.error("Failed to rename '{}' to '{}'", tempFile.getAbsolutePath(), finalFile.getAbsolutePath());
throw new IOException("Failed to rename extracted file");
}
}
return finalFile;
} finally {
if (tempFile != null && tempFile.exists()) {
if (!tempFile.delete()) {
log.warn("Unable to delete temp file: {}", tempFile.getAbsolutePath());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,21 @@ private String extractBinaries() {
if (binPath != null && !binPath.trim().isEmpty()) {
pathToExecutable = binPath.trim();
log.warn("Using non-default native binary at " + pathToExecutable);
pathToLibDir = "";
return "";

File parent = new File(binPath).getParentFile();
pathToLibDir = parent.getAbsolutePath();
CertificateExtractor certificateExtractor = new CertificateExtractor();

try {
String caDirectory = certificateExtractor
.extractCertificates(parent.getAbsoluteFile());
watchFiles.addAll(certificateExtractor.getExtractedCertificates());
FileAgeManager.instance().registerFiles(watchFiles);
return caDirectory;
} catch (IOException ioex) {
log.error("Exception while extracting certificates. Returning no CA directory", ioex);
return "";
}
} else {
log.info("Extracting binaries to " + tmpDir);
try {
Expand All @@ -873,39 +886,14 @@ private String extractBinaries() {

String extension = os.equals("windows") ? ".exe" : "";
String executableName = "kinesis_producer" + extension;
byte[] bin = IOUtils.toByteArray(
this.getClass().getClassLoader().getResourceAsStream(root + "/" + os + "/" + executableName));
MessageDigest md = MessageDigest.getInstance("SHA1");
String mdHex = DatatypeConverter.printHexBinary(md.digest(bin)).toLowerCase();

pathToExecutable = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + extension).toString();
File extracted = new File(pathToExecutable);
watchFiles.add(extracted);

// use dedicated lock-file to limit access to executable by a single process
final String pathToLock = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + ".lock").toString();
final File lockFile = new File(pathToLock);
try (FileOutputStream lockFOS = new FileOutputStream(lockFile);
FileLock lock = lockFOS.getChannel().lock()) {
if (extracted.exists()) {
boolean contentEqual = false;
if (extracted.length() == bin.length) {
try (InputStream executableIS = new FileInputStream(extracted)) {
byte[] existingBin = IOUtils.toByteArray(executableIS);
contentEqual = Arrays.equals(bin, existingBin);
}
}
if (!contentEqual) {
throw new SecurityException("The contents of the binary " + extracted.getAbsolutePath()
+ " is not what it's expected to be.");
}
} else {
try (OutputStream fos = new FileOutputStream(extracted)) {
IOUtils.write(bin, fos);
}
extracted.setExecutable(true);
}
}
InputStream is = this.getClass().getClassLoader().getResourceAsStream(root + "/" + os + "/" + executableName);
String resultFileFormat = "kinesis_producer_%s" + extension;

File extracted = HashedFileCopier.copyFileFrom(is, tmpDirFile, resultFileFormat);
watchFiles.add(extracted);
extracted.setExecutable(true);
pathToExecutable = extracted.getAbsolutePath();

CertificateExtractor certificateExtractor = new CertificateExtractor();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.producer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.DigestInputStream;
import java.security.MessageDigest;

import javax.xml.bind.DatatypeConverter;

import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HashedFileCopierTest {

private static final Logger log = LoggerFactory.getLogger(HashedFileCopierTest.class);

private static final String TEST_FILE_PREFIX = "res-file.";
private static final String TEST_FILE_SUFFIX = ".txt";
private static final String TEST_FILE_FORMAT = TEST_FILE_PREFIX + "%s" + TEST_FILE_SUFFIX;

private File tempDir;

@Before
public void before() throws Exception {
tempDir = Files.createTempDirectory("kpl-unit-tests").toFile();
}

@After
public void after() throws Exception {
int extraTempFiles = 0;
if (tempDir != null && tempDir.isDirectory()) {
DirectoryStream<Path> directoryStream = Files.newDirectoryStream(tempDir.toPath());
for (Path entry : directoryStream) {
String filename = entry.toFile().getName();
if (filename.startsWith(HashedFileCopier.TEMP_PREFIX)
&& filename.endsWith(HashedFileCopier.TEMP_SUFFIX)) {
Files.delete(entry);
extraTempFiles++;
continue;
}
if (filename.startsWith(TEST_FILE_PREFIX) && filename.endsWith(TEST_FILE_SUFFIX)) {
Files.delete(entry);
continue;
}
if (filename.startsWith(TEST_FILE_PREFIX) && filename.endsWith(HashedFileCopier.LOCK_SUFFIX)) {
Files.delete(entry);
continue;
}
log.warn("Unexpected file {} found. Not deleting the file.", entry);
}
Files.delete(tempDir.toPath());
}

assertThat("Copier didn't clean up all temporary files.", extraTempFiles, equalTo(0));
}

@Test
public void normalFileCopyTest() throws Exception {

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, TEST_FILE_FORMAT);
File expectedFile = makeTestFile();

assertThat(resultFile, equalTo(expectedFile));
assertThat(expectedFile.exists(), equalTo(true));

byte[] writtenBytes = Files.readAllBytes(resultFile.toPath());
byte[] expectedBytes = IOUtils.toByteArray(testDataInputStream());

assertThat(writtenBytes, equalTo(expectedBytes));

}

@Test
public void fileExistsTest() throws Exception {
File expectedFile = makeTestFile();
try (FileOutputStream fso = new FileOutputStream(expectedFile)) {
IOUtils.copy(testDataInputStream(), fso);
}
File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, TEST_FILE_FORMAT);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

@Test
public void lengthMismatchTest() throws Exception {
File expectedFile = makeTestFile();
FileOutputStream fso = new FileOutputStream(expectedFile);
IOUtils.copy(testDataInputStream(), fso);
fso.write("This is some extra crap".getBytes(Charset.forName("UTF-8")));
fso.close();

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, TEST_FILE_FORMAT);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

@Test
public void hashMismatchTest() throws Exception {
File expectedFile = makeTestFile();
byte[] testData = testDataBytes();
testData[10] = (byte)~testData[10];

Files.write(expectedFile.toPath(), testData);

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, TEST_FILE_FORMAT);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

private File makeTestFile() throws Exception {
return new File(tempDir, String.format(TEST_FILE_FORMAT, hexDigestForTestData()));
}

private String hexDigestForTestData() throws Exception {
return DatatypeConverter.printHexBinary(hashForTestData());
}

private byte[] testDataBytes() throws Exception {
return IOUtils.toByteArray(testDataInputStream());
}

private byte[] hashForTestData() throws Exception {
DigestInputStream dis = new DigestInputStream(testDataInputStream(), MessageDigest.getInstance(HashedFileCopier.MESSAGE_DIGEST_ALGORITHM));
IOUtils.toByteArray(dis);
return dis.getMessageDigest().digest();
}

private InputStream testDataInputStream() {
return this.getClass().getClassLoader().getResourceAsStream("test-data/test.txt");
}
}
Loading