Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
scripts: oss scorecard pindeps: Use pip-tools compile
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <[email protected]>
  • Loading branch information
pdxjohnny committed Jun 22, 2024
1 parent a9a16aa commit 1a56acc
Showing 1 changed file with 177 additions and 25 deletions.
202 changes: 177 additions & 25 deletions scripts/ossf_scorecard_pindeps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@
import json
import copy
import shlex
import shutil
import tarfile
import zipfile
import pathlib
import textwrap
import tempfile
import functools
import subprocess
import dataclasses
import urllib.request
from typing import List, Optional

import tomli_w
import cachier
from pydantic import BaseModel


Expand All @@ -25,6 +33,70 @@ def pypi_package_json(package: str) -> dict:
return json.load(resp)


# Remove tempdir from cache
@cachier.cachier(hash_func=lambda args, kwargs: kwargs["url"])
def find_package_name_from_zip(tempdir, url):
# - Download the package
# - Run sys.executable -m build . on the package
# - Read `Name: return this-value` from `dist/*.tar.gz/*/PKG-INFO
url_parsed = urllib.parse.urlparse(url, allow_fragments=True)
if url_parsed.fragment:
fragment = urllib.parse.parse_qs(url_parsed.fragment)

if "egg" in fragment:
return fragment["egg"][0]

if "subdiretory" in fragment:
raise NotImplementedError("subdirectory within URL fragment (#)")

package_zip_path = tempdir.joinpath("package.zip")
package_path = tempdir.joinpath("package")
with open(
package_zip_path,
"wb",
) as zip_fileobj, urllib.request.urlopen(
url,
) as resp: # skipcq: BAN-B310
shutil.copyfileobj(resp, zip_fileobj)

with zipfile.ZipFile(package_zip_path) as zipfileobj:
zipfileobj.extractall(path=package_path)

package_path = list(package_path.glob("*"))[0]
built_package_path = tempdir.joinpath("built_package")

cmd = [
sys.executable,
"-m",
"build",
".",
]
subprocess.check_call(cmd, cwd=package_path)

built_package_zip_path = list(package_path.joinpath("dist").glob("*.whl"))[
0
]

with zipfile.ZipFile(built_package_zip_path) as zipfileobj:
zipfileobj.extractall(path=built_package_path)

built_package_pkg_info_path = list(built_package_path.glob("*.dist-info"))[
0
].joinpath(
"METADATA",
)

return list(
[
line.split(":", maxsplit=1)[1]
for line in built_package_pkg_info_path.read_text().split("\n")
if line.startswith("Name:")
]
)[0].strip()


# @cachier.cachier(hash_func=lambda args, kwargs: shlex.join(kwargs["cmd"]))
@snoop
def pin_packages(cmd):
cmd = copy.copy(cmd)
i_install = cmd.index("install")
Expand All @@ -36,15 +108,18 @@ def pin_packages(cmd):
packages = [
arg
for i, arg in enumerate(cmd[i_install + 1 :], start=i_install + 1)
if cmd[i - 1] != "-e" and not arg.startswith("-")
if cmd[i - 1] != "-e" and not arg.startswith("-") and arg.strip()
]
if "-U" in cmd:
cmd.remove("-U")
if "--upgrade" in cmd:
cmd.remove("--upgrade")

for i, package_name in enumerate(packages):
if (
not package_name.strip()
or package_name.startswith("http://")
or package_name.startswith("https://")
or package_name.startswith("git+")
or "http://" in package_name
or "https://" in package_name
or "==" in package_name
):
continue
Expand All @@ -53,18 +128,60 @@ def pin_packages(cmd):

pypi_latest_package_version = package_json["info"]["version"]

for release_dict in package_json["releases"][
pypi_latest_package_version
]:
cmd.insert(
cmd.index(package_name, i_install + 1) + 2,
f'--hash=sha256:{release_dict["digests"]["sha256"]}',
packages[
packages.index(package_name)
] = f"{package_name}=={pypi_latest_package_version}"

with tempfile.TemporaryDirectory() as tempdir:
tempdir = pathlib.Path(tempdir)
if not packages:
return None

pyproject_toml_dir_path = tempdir.joinpath("pyproject-gen")
pyproject_toml_dir_path.mkdir()
pyproject_toml_path = pyproject_toml_dir_path.joinpath(
"pyproject.toml"
)
pyproject_toml_path.write_text(
tomli_w.dumps(
{
"build-system": {
"requires": ["setuptools >= 61.0"],
"build-backend": "setuptools.build_meta",
},
"project": {
"name": "pip-tools-compile-pin-deps",
"version": "1.0.0",
"dependencies": [
*editable_packages,
*[
(
arg
if not ".zip" in arg and not "://" in arg
else f"{find_package_name_from_zip(tempdir, arg)} @ {arg}"
)
for arg in packages
if "==" not in arg and "$" not in arg
],
],
},
},
)
)

cmd[
cmd.index(package_name, i_install + 1)
] = f"{package_name}=={pypi_latest_package_version}"
return cmd
print(pyproject_toml_path.read_text())

cmd = [
sys.executable,
"-m",
"piptools",
"compile",
"--generate-hashes",
str(pyproject_toml_path.resolve()),
]
return subprocess.check_output(
cmd,
)


class Snippet(BaseModel):
Expand Down Expand Up @@ -127,14 +244,10 @@ def get_sarif_results(ossf_scorecard_sarif):
else:
yield event_subject, result


@snoop
def main():
for event, result in get_sarif_results(json.load(sys.stdin)):
for location in result.locations:
pip_install_command = shlex.split(
location.physicalLocation.region.snippet.text
)
pinned_pip_install_command = pin_packages(pip_install_command)
if (
location.physicalLocation.artifactLocation.uriBaseId
!= "%SRCROOT%"
Expand All @@ -147,12 +260,41 @@ def main():
)
if not path.is_file():
raise FileNotFoundError(path)

snippet = location.physicalLocation.region.snippet

lines = path.read_text().split("\n")

env = {}
for line_number, line in enumerate(lines):
if "export " in line and "=" in line:
key, value = (
line.split("export ", maxsplit=1)[1]
.strip()
.split("=", maxsplit=1)
)
env[key] = value

for env_var_name, replace in env.items():
for find in [
"$" + env_var_name,
"${" + env_var_name + "}",
]:
if find in snippet.text:
snippet.text = snippet.text.replace(find, replace)

pip_install_command = shlex.split(snippet.text)
pinned_pip_install_command = pin_packages(pip_install_command)
if pinned_pip_install_command is None:
snoop.pp("Nothing we can do here", pip_install_command)
continue

new_lines = []
for line_number, line in enumerate(path.read_text().split("\n")):
for line_number, line in enumerate(lines):
if (
line_number >= location.physicalLocation.region.startLine
and line_number <= location.physicalLocation.region.endLine
and location.physicalLocation.region.snippet.text in line
and snippet.text in line
):
with snoop():
line_start = line[: line.index(pip_install_command[0])]
Expand All @@ -164,11 +306,21 @@ def main():
pip_install_command[-1], i_line_end
) + len(pip_install_command[-1])
line_end = line[i_line_end:]
line = (
line_start
+ shlex.join(pinned_pip_install_command)
+ line_end
shlex.join(pinned_pip_install_command)
new_lines.append(
shlex.join(
[
"echo",
"-e",
pinned_pip_install_command,
"|",
"tee",
"requirements.txt",
]
)
)
line = line_start + "-r requirements.txt" + line_end
snoop.pp(new_lines[-1], line)
snoop.pp(location.physicalLocation, line_number, line)
new_lines.append(line)
path.write_text("\n".join(new_lines))
Expand Down

0 comments on commit 1a56acc

Please sign in to comment.