WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/macaron/build_spec_generator/build_spec_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def gen_build_spec_for_purl(
case BuildSpecFormat.DOCKERFILE:
try:
build_spec_content = gen_dockerfile(build_spec)
except ValueError as error:
logger.error("Error while serializing the build spec: %s.", error)
except GenerateBuildSpecError as error:
logger.error("Error while generating the build spec: %s.", error)
return os.EX_DATAERR
build_spec_file_path = os.path.join(build_spec_dir_path, "dockerfile.buildspec")

Expand Down
2 changes: 1 addition & 1 deletion src/macaron/build_spec_generator/common_spec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def gen_generic_build_spec(
"purl": str(purl),
"language": target_language,
"build_tools": build_tool_names,
"build_commands": [selected_build_command],
"build_commands": [selected_build_command] if selected_build_command else [],
}
)
ECOSYSTEMS[purl.type.upper()].value(base_build_spec_dict).resolve_fields(purl)
Expand Down
52 changes: 38 additions & 14 deletions src/macaron/build_spec_generator/common_spec/pypi_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ def resolve_fields(self, purl: PackageURL) -> None:
python_version_set: set[str] = set()
wheel_name_python_version_list: list[str] = []
wheel_name_platforms: set[str] = set()
# Precautionary fallback to default version
chronologically_likeliest_version: str = defaults.get("heuristic.pypi", "default_setuptools")

if pypi_package_json is not None:
if pypi_package_json.package_json or pypi_package_json.download(dest=""):
Expand Down Expand Up @@ -150,6 +152,19 @@ def resolve_fields(self, purl: PackageURL) -> None:
parsed_build_requires["setuptools"] = "==" + defaults.get(
"heuristic.pypi", "setuptools_version_emitting_platform_unknown"
)
chronologically_likeliest_version = (
pypi_package_json.get_chronologically_suitable_setuptools_version()
)
try:
# Get information from the wheel file name.
logger.debug(pypi_package_json.wheel_filename)
_, _, _, tags = parse_wheel_filename(pypi_package_json.wheel_filename)
for tag in tags:
wheel_name_python_version_list.append(tag.interpreter)
wheel_name_platforms.add(tag.platform)
logger.debug(python_version_set)
except InvalidWheelFilename:
logger.debug("Could not parse wheel file name to extract version")
except SourceCodeError:
logger.debug("Could not find pure wheel matching this PURL")

Expand All @@ -165,6 +180,10 @@ def resolve_fields(self, purl: PackageURL) -> None:
requires = json_extract(content, ["build-system", "requires"], list)
if requires:
build_requires_set.update(elem.replace(" ", "") for elem in requires)
# If we cannot find [build-system] requires, we lean on the fact that setuptools
# was the de-facto build tool, and infer a setuptools version to include.
else:
build_requires_set.add(f"setuptools=={chronologically_likeliest_version}")
backend = json_extract(content, ["build-system", "build-backend"], str)
if backend:
build_backends_set.add(backend.replace(" ", ""))
Expand All @@ -177,6 +196,10 @@ def resolve_fields(self, purl: PackageURL) -> None:
build_requires_set,
build_backends_set,
)
# Here we have successfully analyzed the pyproject.toml file. Now, if we have a setup.py/cfg,
# we also need to infer a setuptools version to infer.
if pypi_package_json.file_exists("setup.py") or pypi_package_json.file_exists("setup.cfg"):
build_requires_set.add(f"setuptools=={chronologically_likeliest_version}")
except TypeError as error:
logger.debug(
"Found a type error while reading the pyproject.toml file from the sdist: %s", error
Expand All @@ -185,6 +208,9 @@ def resolve_fields(self, purl: PackageURL) -> None:
logger.debug("Failed to read the pyproject.toml file from the sdist: %s", error)
except SourceCodeError as error:
logger.debug("No pyproject.toml found: %s", error)
# Here we do not have a pyproject.toml file. Instead, we lean on the fact that setuptools
# was the de-facto build tool, and infer a setuptools version to include.
build_requires_set.add(f"setuptools=={chronologically_likeliest_version}")
except SourceCodeError as error:
logger.debug("No source distribution found: %s", error)

Expand All @@ -198,17 +224,6 @@ def resolve_fields(self, purl: PackageURL) -> None:
except (InvalidRequirement, InvalidSpecifier) as error:
logger.debug("Malformed requirement encountered %s : %s", requirement, error)

try:
# Get information from the wheel file name.
logger.debug(pypi_package_json.wheel_filename)
_, _, _, tags = parse_wheel_filename(pypi_package_json.wheel_filename)
for tag in tags:
wheel_name_python_version_list.append(tag.interpreter)
wheel_name_platforms.add(tag.platform)
logger.debug(python_version_set)
except InvalidWheelFilename:
logger.debug("Could not parse wheel file name to extract version")

self.data["language_version"] = list(python_version_set) or wheel_name_python_version_list

# Use the default build command for pure Python packages.
Expand All @@ -227,9 +242,18 @@ def resolve_fields(self, purl: PackageURL) -> None:

if not patched_build_commands:
# Resolve and patch build commands.
selected_build_commands = self.data["build_commands"] or self.get_default_build_commands(
self.data["build_tools"]
)

# To ensure that selected_build_commands is never empty, we seed with the fallback
# command of python -m build --wheel -n
if self.data["build_commands"]:
selected_build_commands = self.data["build_commands"]
else:
self.data["build_commands"] = ["python -m build --wheel -n".split()]
selected_build_commands = (
self.get_default_build_commands(self.data["build_tools"]) or self.data["build_commands"]
)

logger.debug(selected_build_commands)

patched_build_commands = (
patch_commands(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""This module implements the logic to generate a dockerfile from a Python buildspec."""

import logging
import re
from textwrap import dedent

from packaging.specifiers import InvalidSpecifier, SpecifierSet
Expand Down Expand Up @@ -35,8 +36,7 @@ def gen_dockerfile(buildspec: BaseBuildSpecDict) -> str:
"""
language_version: str | None = pick_specific_version(buildspec)
if language_version is None:
logger.debug("Could not derive a specific interpreter version.")
raise GenerateBuildSpecError("Could not derive specific interpreter version.")
raise GenerateBuildSpecError("Could not derive specific interpreter version")
backend_install_commands: str = " && ".join(build_backend_commands(buildspec))
build_tool_install: str = ""
if (
Expand Down Expand Up @@ -124,8 +124,18 @@ def pick_specific_version(buildspec: BaseBuildSpecDict) -> str | None:
try:
version_set &= SpecifierSet(version)
except InvalidSpecifier as error:
logger.debug("Malformed interpreter version encountered: %s (%s)", version, error)
return None
logger.debug("Non-standard interpreter version encountered: %s (%s)", version, error)
# Whilst the Python tags specify interpreter implementation
# as well as version, with no standard way to parse out the
# implementation, we can attempt to heuristically:
try_parse_version = infer_interpreter_version(version)
if try_parse_version:
try:
version_set &= SpecifierSet(f">={try_parse_version}")
except InvalidSpecifier as error_for_retry:
logger.debug("Could not parse interpreter version from: %s (%s)", version, error_for_retry)

logger.debug(version_set)

# Now to get the latest acceptable one, we can step through all interpreter
# versions. For the most accurate result, we can query python.org for a
Expand All @@ -141,6 +151,31 @@ def pick_specific_version(buildspec: BaseBuildSpecDict) -> str | None:
return None


def infer_interpreter_version(tag: str) -> str | None:
"""Infer interpreter version from Python-tag.

Parameters
----------
tag: Python-tag, likely inferred from wheel name.


Returns
-------
str: interpreter version inferred from Python-tag
"""
# We will parse the interpreter version of CPython or just
# whatever generic Python version is specified.
pattern = re.compile(r"^(py|cp)(\d{1,3})$")
parsed_tag = pattern.match(tag)
if parsed_tag:
digits = parsed_tag.group(2)
# As match succeeded len(digits) \in {1,2,3}
if len(digits) == 1:
return parsed_tag.group(2)
return f"{digits[0]}.{digits[1:]}"
return None


def build_backend_commands(buildspec: BaseBuildSpecDict) -> list[str]:
"""Generate the installation commands for each inferred build backend.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
False,
pypi_package_json.pypi_registry,
{},
"",
"",
"",
PyPIInspectorAsset("", [], {}),
)
if not adjacent_pypi_json.download(""):
Expand Down
2 changes: 1 addition & 1 deletion src/macaron/repo_finder/repo_finder_pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def find_repo(
if not pypi_registry:
return "", RepoFinderInfo.PYPI_NO_REGISTRY
pypi_asset = PyPIPackageJsonAsset(
purl.name, purl.version, False, pypi_registry, {}, "", "", "", PyPIInspectorAsset("", [], {})
purl.name, purl.version, False, pypi_registry, {}, PyPIInspectorAsset("", [], {})
)

if not pypi_asset:
Expand Down
92 changes: 85 additions & 7 deletions src/macaron/slsa_analyzer/package_registry/pypi_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""The module provides abstractions for the pypi package registry."""
from __future__ import annotations

import bisect
import hashlib
import logging
import os
Expand All @@ -15,7 +16,7 @@
import zipfile
from collections.abc import Callable, Generator, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -502,6 +503,42 @@ def get_maintainer_join_date(self, username: str) -> datetime | None:

return res.replace(tzinfo=None) if res else None

def get_matching_setuptools_version(self, package_release_datetime: datetime) -> str:
"""Find the setuptools that would be "latest" for the input datetime.

Parameters
----------
package_release_datetime: str
Release datetime of a package we wish to rebuild

Returns
-------
str: Matching version of setuptools
"""
setuptools_endpoint = urllib.parse.urljoin(self.registry_url, "pypi/setuptools/json")
setuptools_json = self.download_package_json(setuptools_endpoint)
releases = json_extract(setuptools_json, ["releases"], dict)
if releases:
release_tuples = [
(version, release_info[0].get("upload_time"))
for version, release_info in releases.items()
if release_info
]
# Cannot assume this is sorted, as releases is just a dict
release_tuples.sort(key=lambda x: x[1])
# bisect_left gives position to insert package_release_datetime to maintain order, hence we do -1
index = (
bisect.bisect_left(
release_tuples, package_release_datetime, key=lambda x: datetime.strptime(x[1], "%Y-%m-%dT%H:%M:%S")
)
- 1
)
return str(release_tuples[index][0])
# This realistically cannot happen: it would mean we somehow are trying to rebuild
# for a package and version with no releases.
# Return default just in case.
return defaults.get("heuristic.pypi", "default_setuptools")

@staticmethod
def extract_attestation(attestation_data: dict) -> dict | None:
"""Extract the first attestation file from a PyPI attestation response.
Expand Down Expand Up @@ -618,13 +655,16 @@ class PyPIPackageJsonAsset:
package_json: dict

#: The source code temporary location name.
package_sourcecode_path: str
package_sourcecode_path: str = field(init=False)

#: The wheel temporary location name.
wheel_path: str
wheel_path: str = field(init=False)

#: Name of the wheel file.
wheel_filename: str
wheel_filename: str = field(init=False)

#: The datetime that the wheel was uploaded.
package_upload_time: datetime = field(init=False)

#: The pypi inspector information about this package
inspector_asset: PyPIInspectorAsset
Expand Down Expand Up @@ -769,6 +809,8 @@ def get_wheel_url(self, tag: str = "none-any") -> str | None:
if not urls:
return None
for distribution in urls:
# In this way we have an package_upload_time even if we dont have cannot find the wheel
self.package_upload_time = datetime.strptime(distribution.get("upload_time") or "", "%Y-%m-%dT%H:%M:%S")
# Only examine wheels
if distribution.get("packagetype") != "bdist_wheel":
continue
Expand All @@ -779,6 +821,7 @@ def get_wheel_url(self, tag: str = "none-any") -> str | None:
# Continue to getting url
wheel_url: str = distribution.get("url") or ""
if wheel_url:
self.package_upload_time = datetime.strptime(distribution.get("upload_time") or "", "%Y-%m-%dT%H:%M:%S")
try:
parsed_url = urllib.parse.urlparse(wheel_url)
except ValueError:
Expand Down Expand Up @@ -919,6 +962,33 @@ def get_sourcecode_file_contents(self, path: str) -> bytes:
logger.debug(error_msg)
raise SourceCodeError(error_msg) from read_error

def file_exists(self, path: str) -> bool:
"""Check if a file exists in the downloaded source code.

The path can be relative to the package_sourcecode_path attribute, or an absolute path.

Parameters
----------
path: str
The absolute or relative to package_sourcecode_path file path to check for.

Returns
-------
bool: Whether or not a file at path absolute or relative to package_sourcecode_path exists.
"""
if not self.package_sourcecode_path:
# No source code files were downloaded
return False

if not os.path.isabs(path):
path = os.path.join(self.package_sourcecode_path, path)

if not os.path.exists(path):
# Could not find a file at that path
return False

return True

def iter_sourcecode(self) -> Iterator[tuple[str, bytes]]:
"""
Iterate through all source code files.
Expand Down Expand Up @@ -1054,6 +1124,16 @@ def get_inspector_src_preview_links(self) -> bool:
# If all distributions were invalid and went along a 'continue' path.
return bool(self.inspector_asset)

def get_chronologically_suitable_setuptools_version(self) -> str:
"""Find version of setuptools that would be "latest" for this package.

Returns
-------
str
Chronologically likeliest setuptools version
"""
return self.pypi_registry.get_matching_setuptools_version(self.package_upload_time)


def find_or_create_pypi_asset(
asset_name: str, asset_version: str | None, pypi_registry_info: PackageRegistryInfo
Expand Down Expand Up @@ -1091,8 +1171,6 @@ def find_or_create_pypi_asset(
logger.debug("Failed to create PyPIPackageJson asset.")
return None

asset = PyPIPackageJsonAsset(
asset_name, asset_version, False, package_registry, {}, "", "", "", PyPIInspectorAsset("", [], {})
)
asset = PyPIPackageJsonAsset(asset_name, asset_version, False, package_registry, {}, PyPIInspectorAsset("", [], {}))
pypi_registry_info.metadata.append(asset)
return asset
2 changes: 1 addition & 1 deletion tests/malware_analyzer/pypi/test_wheel_absence.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_get_inspector_src_preview_links(mock_send_head_http_raw: MagicMock) ->
mock_send_head_http_raw.return_value = MagicMock() # Assume valid URL for testing purposes.

pypi_package_json = PyPIPackageJsonAsset(
package_name, version, False, pypi_registry, package_json, "", "", "", PyPIInspectorAsset("", [], {})
package_name, version, False, pypi_registry, package_json, PyPIInspectorAsset("", [], {})
)

assert pypi_package_json.get_inspector_src_preview_links() is True
Expand Down
Loading