Skip to content

Commit

Permalink
Upload segment to remote store post refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale authored and Sachin Kale committed Jun 1, 2022
1 parent d58da23 commit 2a4c5a0
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@

package org.opensearch.index.shard;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
Expand All @@ -20,19 +29,60 @@ public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListe

private final Directory storeDirectory;
private final Directory remoteDirectory;
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
private final Set<String> filesUploadedToRemoteStore;
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);

public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) {
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
// ToDo: Handle failures in reading list of files (GitHub #3397)
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
}

@Override
public void beforeRefresh() throws IOException {
// ToDo Add implementation
// Do Nothing
}

/**
* Upload new segment files created as part of the last refresh to the remote segment store.
* The method also deletes segment files from remote store which are not part of local filesystem.
* @param didRefresh true if the refresh opened a new reference
* @throws IOException in case of I/O error in reading list of local files
*/
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
// ToDo Add implementation
if (didRefresh) {
Set<String> localFiles = Arrays.stream(storeDirectory.listAll()).collect(Collectors.toSet());
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
filesUploadedToRemoteStore.add(file);
} catch (NoSuchFileException e) {
logger.info(
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
e
);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});

Set<String> remoteFilesToBeDeleted = new HashSet<>();
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
try {
remoteDirectory.deleteFile(file);
remoteFilesToBeDeleted.add(file);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
}
});

remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.file.NoSuchFileException;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.doThrow;

public class RemoteStoreRefreshListenerTests extends OpenSearchTestCase {
private Directory storeDirectory;
private Directory remoteDirectory;

private RemoteStoreRefreshListener remoteStoreRefreshListener;

public void setup(String[] remoteFiles) throws IOException {
storeDirectory = mock(Directory.class);
remoteDirectory = mock(Directory.class);
when(remoteDirectory.listAll()).thenReturn(remoteFiles);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(storeDirectory, remoteDirectory);
}

public void testAfterRefreshFalse() throws IOException {
setup(new String[0]);
remoteStoreRefreshListener.afterRefresh(false);
verify(storeDirectory, times(0)).listAll();
}

public void testAfterRefreshTrueNoLocalFiles() throws IOException {
setup(new String[0]);

when(storeDirectory.listAll()).thenReturn(new String[0]);

remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any());
verify(remoteDirectory, times(0)).deleteFile(any());
}

public void testAfterRefreshOnlyUploadFiles() throws IOException {
setup(new String[0]);

String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" };
when(storeDirectory.listAll()).thenReturn(localFiles);

remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);
verify(remoteDirectory, times(0)).deleteFile(any());
}

public void testAfterRefreshOnlyUploadAndDelete() throws IOException {
setup(new String[] { "0.si", "0.cfs" });

String[] localFiles = new String[] { "segments_1", "1.si", "1.cfs", "1.cfe" };
when(storeDirectory.listAll()).thenReturn(localFiles);

remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.si", "1.si", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT);
verify(remoteDirectory).deleteFile("0.si");
verify(remoteDirectory).deleteFile("0.cfs");
}

public void testAfterRefreshOnlyDelete() throws IOException {
setup(new String[] { "0.si", "0.cfs" });

String[] localFiles = new String[] { "0.si" };
when(storeDirectory.listAll()).thenReturn(localFiles);

remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any());
verify(remoteDirectory).deleteFile("0.cfs");
}

public void testAfterRefreshTempLocalFile() throws IOException {
setup(new String[0]);

String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs.tmp" };
when(storeDirectory.listAll()).thenReturn(localFiles);
doThrow(new NoSuchFileException("0.cfs.tmp")).when(remoteDirectory)
.copyFrom(storeDirectory, "0.cfs.tmp", "0.cfs.tmp", IOContext.DEFAULT);

remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
verify(remoteDirectory, times(0)).deleteFile(any());
}

public void testAfterRefreshConsecutive() throws IOException {
setup(new String[0]);

String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" };
when(storeDirectory.listAll()).thenReturn(localFiles);
doThrow(new IOException("0.cfs")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfe", IOContext.DEFAULT);
doThrow(new IOException("0.cfe")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);

remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);
verify(remoteDirectory, times(0)).deleteFile(any());

String[] localFilesSecondRefresh = new String[] { "segments_1", "0.cfs", "1.cfs", "1.cfe" };
when(storeDirectory.listAll()).thenReturn(localFilesSecondRefresh);

remoteStoreRefreshListener.afterRefresh(true);

verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT);
verify(remoteDirectory).deleteFile("0.si");
}
}

0 comments on commit 2a4c5a0

Please sign in to comment.