Skip to content

Commit

Permalink
Merge pull request #578 from ChrisCummins/fix/flaky-rpc
Browse files Browse the repository at this point in the history
Hardening patchset
  • Loading branch information
ChrisCummins authored Feb 22, 2022
2 parents 1d095e2 + 25c982d commit d70408f
Show file tree
Hide file tree
Showing 34 changed files with 270 additions and 286 deletions.
9 changes: 9 additions & 0 deletions compiler_gym/envs/compiler_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,15 @@ def apply(self, state: CompilerEnvState) -> None: # noqa
if not self.in_episode:
self.reset(benchmark=state.benchmark)

# TODO(cummins): Does this behavior make sense? Take, for example:
#
# >>> env.apply(state)
# >>> env.benchmark == state.benchmark
# False
#
# I think most users would reasonable expect `env.apply(state)` to fully
# apply the state, not just the sequence of actions. And what about the
# choice of observation space, reward space, etc?
if self.benchmark != state.benchmark:
warnings.warn(
f"Applying state from environment for benchmark '{state.benchmark}' "
Expand Down
4 changes: 2 additions & 2 deletions compiler_gym/envs/llvm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from compiler_gym.envs.llvm.compute_observation import compute_observation
from compiler_gym.envs.llvm.llvm_benchmark import (
ClangInvocation,
get_system_includes,
get_system_library_flags,
make_benchmark,
)
from compiler_gym.envs.llvm.llvm_env import LlvmEnv
Expand All @@ -24,7 +24,7 @@
__all__ = [
"ClangInvocation",
"compute_observation",
"get_system_includes",
"get_system_library_flags",
"LLVM_SERVICE_BINARY",
"LlvmEnv",
"make_benchmark",
Expand Down
5 changes: 4 additions & 1 deletion compiler_gym/envs/llvm/datasets/cbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from compiler_gym.datasets import Benchmark, TarDatasetWithManifest
from compiler_gym.datasets.uri import BenchmarkUri
from compiler_gym.envs.llvm import llvm_benchmark
from compiler_gym.service.proto import BenchmarkDynamicConfig, Command
from compiler_gym.third_party import llvm
from compiler_gym.util.commands import Popen
Expand Down Expand Up @@ -491,7 +492,9 @@ def validator(
DYNAMIC_CONFIGS[uri.path].append(
BenchmarkDynamicConfig(
build_cmd=Command(
argument=["$CC", "$IN"] + linkopts,
argument=["$CC", "$IN"]
+ llvm_benchmark.get_system_library_flags()
+ linkopts,
timeout_seconds=60,
outfile=["a.out"],
),
Expand Down
3 changes: 2 additions & 1 deletion compiler_gym/envs/llvm/datasets/csmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from compiler_gym.datasets import Benchmark, BenchmarkSource, Dataset
from compiler_gym.datasets.benchmark import BenchmarkInitError, BenchmarkWithSource
from compiler_gym.datasets.uri import BenchmarkUri
from compiler_gym.envs.llvm import llvm_benchmark
from compiler_gym.envs.llvm.llvm_benchmark import ClangInvocation
from compiler_gym.service.proto import BenchmarkDynamicConfig, Command
from compiler_gym.util.commands import Popen, communicate
Expand Down Expand Up @@ -40,7 +41,7 @@ def __init__(self, *args, **kwargs):
self.proto.dynamic_config.MergeFrom(
BenchmarkDynamicConfig(
build_cmd=Command(
argument=["$CC", "$IN"],
argument=["$CC", "$IN"] + llvm_benchmark.get_system_library_flags(),
outfile=["a.out"],
timeout_seconds=60,
),
Expand Down
114 changes: 70 additions & 44 deletions compiler_gym/envs/llvm/llvm_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import os
import random
import subprocess
import sys
import tempfile
from concurrent.futures import as_completed
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import Iterable, List, Optional, Union

Expand All @@ -22,31 +24,46 @@
logger = logging.getLogger(__name__)


def get_compiler_includes(compiler: str) -> Iterable[Path]:
"""Run the system compiler in verbose mode on a dummy input to get the
system header search path.
"""
# Create a temporary directory to write the compiled 'binary' to, since
# GNU assembler does not support piping to stdout.
with tempfile.TemporaryDirectory() as d:
class HostCompilerFailure(OSError):
"""Exception raised when the system compiler fails."""


class UnableToParseHostCompilerOutput(HostCompilerFailure):
"""Exception raised if unable to parse the verbose output of the host
compiler."""


def _get_system_library_flags(compiler: str) -> Iterable[str]:
"""Private implementation function."""
# Create a temporary directory to write the compiled binary to, since GNU
# assembler does not support piping to stdout.
transient_cache = transient_cache_path(".")
transient_cache.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(dir=transient_cache) as f:
try:
cmd = [compiler, "-xc++", "-v", "-", "-o", f.name]
# On macOS we need to compile a binary to invoke the linker.
if sys.platform != "darwin":
cmd.append("-c")
with Popen(
[compiler, "-xc++", "-v", "-c", "-", "-o", str(Path(d) / "a.out")],
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
universal_newlines=True,
) as process:
_, stderr = process.communicate(input="", timeout=30)
_, stderr = process.communicate(
input="int main(){return 0;}", timeout=30
)
if process.returncode:
raise OSError(
f"Failed to invoke {compiler}. "
raise HostCompilerFailure(
f"Failed to invoke '{compiler}'. "
f"Is there a working system compiler?\n"
f"Error: {stderr.strip()}"
)
except FileNotFoundError as e:
raise OSError(
f"Failed to invoke {compiler}. "
raise HostCompilerFailure(
f"Failed to invoke '{compiler}'. "
f"Is there a working system compiler?\n"
f"Error: {e}"
) from e
Expand All @@ -59,53 +76,65 @@ def get_compiler_includes(compiler: str) -> Iterable[Path]:
# /path/2
# End of search list
in_search_list = False
for line in stderr.split("\n"):
lines = stderr.split("\n")
for line in lines:
if in_search_list and line.startswith("End of search list"):
break
elif in_search_list:
# We have an include path to return.
path = Path(line.strip())
yield path
yield "-isystem"
yield str(path)
# Compatibility fix for compiling benchmark sources which use the
# '#include <endian.h>' header, which on macOS is located in a
# 'machine/endian.h' directory.
if (path / "machine").is_dir():
yield path / "machine"
yield "-isystem"
yield str(path / "machine")
elif line.startswith("#include <...> search starts here:"):
in_search_list = True
else:
msg = f"Failed to parse '#include <...>' search paths from {compiler}"
msg = f"Failed to parse '#include <...>' search paths from '{compiler}'"
stderr = stderr.strip()
if stderr:
msg += f":\n{stderr}"
raise OSError(msg)
raise UnableToParseHostCompilerOutput(msg)

if sys.platform == "darwin":
yield "-L/Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/lib"

# Memoized search paths. Call get_system_includes() to access them.
_SYSTEM_INCLUDES = None

@lru_cache(maxsize=32)
def _get_cached_system_library_flags(compiler: str) -> List[str]:
"""Private implementation detail."""
return list(_get_system_library_flags(compiler))

def get_system_includes() -> List[Path]:
"""Determine the system include paths for C/C++ compilation jobs.

def get_system_library_flags(compiler: Optional[str] = None) -> List[str]:
"""Determine the set of compilation flags needed to use the host system
libraries.
This uses the system compiler to determine the search paths for C/C++ system
headers. By default, :code:`c++` is invoked. This can be overridden by
setting :code:`os.environ["CXX"]`.
headers, and on macOS, the location of libclang_rt.osx.a. By default,
:code:`c++` is invoked. This can be overridden by setting
:code:`os.environ["CXX"]` prior to calling this function.
The results of this function are cached, so changes to CXX will have no
effect on subsequent calls.
:return: A list of command line flags for a compiler.
:return: A list of paths to system header directories.
:raises OSError: If the compiler fails, or if the search paths cannot be
determined.
:raises HostCompilerFailure: If the host compiler cannot be determined, or
fails to compile a trivial piece of code.
:raises UnableToParseHostCompilerOutput: If the output of the compiler
cannot be understood.
"""
# Memoize the system includes paths.
global _SYSTEM_INCLUDES
if _SYSTEM_INCLUDES is None:
system_compiler = os.environ.get("CXX", "c++")
try:
_SYSTEM_INCLUDES = list(get_compiler_includes(system_compiler))
except OSError as e:
logger.warning("%s", e)
_SYSTEM_INCLUDES = []
return _SYSTEM_INCLUDES
compiler = compiler or (os.environ.get("CXX") or "c++")
# We want to cache the results of this expensive query after resolving the
# default value for the compiler argument, as it can changed based on
# environment variables.
return _get_cached_system_library_flags(compiler)


class ClangInvocation:
Expand All @@ -119,7 +148,7 @@ def __init__(
:param args: The list of arguments to pass to clang.
:param system_includes: Whether to include the system standard libraries
during compilation jobs. This requires a system toolchain. See
:func:`get_system_includes`.
:func:`get_system_library_flags`.
:param timeout: The maximum number of seconds to allow clang to run
before terminating.
"""
Expand All @@ -128,13 +157,10 @@ def __init__(
self.timeout = timeout

def command(self, outpath: Path) -> List[str]:
cmd = [str(llvm.clang_path())]
cmd = [str(llvm.clang_path()), "-c", "-emit-llvm", "-o", str(outpath)]
if self.system_includes:
for directory in get_system_includes():
cmd += ["-isystem", str(directory)]

cmd += get_system_library_flags()
cmd += [str(s) for s in self.args]
cmd += ["-c", "-emit-llvm", "-o", str(outpath)]

return cmd

Expand Down Expand Up @@ -253,7 +279,7 @@ def make_benchmark(
:param system_includes: Whether to include the system standard libraries
during compilation jobs. This requires a system toolchain. See
:func:`get_system_includes`.
:func:`get_system_library_flags`.
:param timeout: The maximum number of seconds to allow clang to run before
terminating.
Expand Down
2 changes: 1 addition & 1 deletion compiler_gym/envs/llvm/llvm_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def make_benchmark(
:param system_includes: Whether to include the system standard libraries
during compilation jobs. This requires a system toolchain. See
:func:`get_system_includes`.
:func:`get_system_library_flags`.
:param timeout: The maximum number of seconds to allow clang to run
before terminating.
Expand Down
16 changes: 9 additions & 7 deletions compiler_gym/envs/llvm/service/Cost.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ Status getTextSizeInBytes(llvm::Module& module, int64_t* value, const fs::path&
fmt::format("{} -w -xir - -o {} -c", clangPath.string(), tmpFile.string());
#endif

boost::asio::io_service clangService;
boost::asio::io_context clangContext;
auto stdinBuffer{boost::asio::buffer(ir)};
bp::async_pipe stdinPipe(clangService);
bp::async_pipe stdinPipe(clangContext);
boost::asio::io_context clangStderrStream;
std::future<std::string> clangStderrFuture;

Expand All @@ -88,8 +88,8 @@ Status getTextSizeInBytes(llvm::Module& module, int64_t* value, const fs::path&
stdinPipe, stdinBuffer,
[&](const boost::system::error_code& ec, std::size_t n) { stdinPipe.async_close(); });

clangService.run_for(std::chrono::seconds(60));
if (clangService.poll()) {
clangContext.run_for(std::chrono::seconds(60));
if (clangContext.poll()) {
return Status(StatusCode::INVALID_ARGUMENT,
fmt::format("Failed to compute .text size cost within 60 seconds"));
}
Expand All @@ -113,20 +113,21 @@ Status getTextSizeInBytes(llvm::Module& module, int64_t* value, const fs::path&
bp::child llvmSize(llvmSizeCmd, bp::std_in.close(), bp::std_out > llvmSizeStdoutFuture,
bp::std_err > bp::null, llvmSizeStdoutStream);

if (!util::wait_for(llvmSize, std::chrono::seconds(60))) {
llvmSizeStdoutStream.run_for(std::chrono::seconds(60));
if (llvmSizeStdoutStream.poll()) {
return Status(StatusCode::DEADLINE_EXCEEDED,
fmt::format("Failed to compute .text size cost within 60 seconds"));
}
llvmSize.wait();
llvmSizeOutput = llvmSizeStdoutFuture.get();

llvmSizeStdoutStream.run();
fs::remove(tmpFile);
if (llvmSize.exit_code()) {
return Status(StatusCode::INVALID_ARGUMENT, fmt::format("Failed to compute .text size cost. "
"Command returned exit code {}: {}",
llvmSize.exit_code(), llvmSizeCmd));
}

llvmSizeOutput = llvmSizeStdoutFuture.get();
} catch (bp::process_error& e) {
fs::remove(tmpFile);
return Status(StatusCode::INVALID_ARGUMENT,
Expand Down Expand Up @@ -154,6 +155,7 @@ Status getTextSizeInBytes(llvm::Module& module, int64_t* value, const fs::path&
return Status(StatusCode::INTERNAL,
fmt::format("Failed to parse .TEXT size: `{}`\n", llvmSizeOutput));
}

return Status::OK;
}

Expand Down
2 changes: 1 addition & 1 deletion compiler_gym/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ absl-py>=0.10.0
deprecated>=1.2.12
docker>=4.0.0
fasteners>=0.15
grpcio>=1.32.0
grpcio>=1.32.0,<1.44.0
gym>=0.18.0,<0.21
humanize>=2.6.0
loop_tool_py==0.0.7
Expand Down
14 changes: 11 additions & 3 deletions compiler_gym/service/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@
GRPC_CHANNEL_OPTIONS = [
# Disable the inbound message length filter to allow for large messages such
# as observations.
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
# Fix for "received initial metadata size exceeds limit"
("grpc.max_metadata_size", 512 * 1024),
# Spurious error UNAVAILABLE "Trying to connect an http1.x server".
# https://putridparrot.com/blog/the-unavailable-trying-to-connect-an-http1-x-server-grpc-error/
("grpc.enable_http_proxy", 0),
# Disable TCP port re-use to mitigate port conflict errors when starting
# many services in parallel. Context:
# https:/facebookresearch/CompilerGym/issues/572
("grpc.so_reuseport", 0),
]

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -529,8 +534,11 @@ def close(self):
super().close()

def __repr__(self):
alive_or_dead = "alive" if self.process.poll() else "dead"
return f"{self.url} running on PID={self.process.pid} ({alive_or_dead})"
if self.process.poll() is None:
return (
f"Connection to service at {self.url} running on PID {self.process.pid}"
)
return f"Connection to dead service at {self.url}"


class UnmanagedConnection(Connection):
Expand Down Expand Up @@ -570,7 +578,7 @@ def __init__(self, url: str, rpc_init_max_seconds: float):
super().__init__(channel, url)

def __repr__(self):
return self.url
return f"Connection to unmanaged service {self.url}"


class CompilerGymServiceConnection:
Expand Down
1 change: 1 addition & 0 deletions compiler_gym/service/runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ py_library(
srcs = ["create_and_run_compiler_gym_service.py"],
deps = [
":compiler_gym_service",
"//compiler_gym/service:connection",
"//compiler_gym/service/proto",
"//compiler_gym/util",
],
Expand Down
Loading

0 comments on commit d70408f

Please sign in to comment.