Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Codify linkcheck status codes into a Literal #13040

Merged
merged 4 commits into from
Oct 19, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions sphinx/builders/linkcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from typing import Any
from typing import Any, Literal, TypeAlias

from requests import Response

Expand All @@ -38,6 +38,17 @@
from sphinx.util._pathlib import _StrPath
from sphinx.util.typing import ExtensionMetadata

_Statuses: TypeAlias = Literal[
'broken',
'ignored',
'local',
'rate-limited',
'redirected',
'timeout',
'unchecked',
'working',
]

logger = logging.getLogger(__name__)

# matches to foo:// and // (a protocol relative URL)
Expand Down Expand Up @@ -85,7 +96,7 @@ def finish(self) -> None:
def process_result(self, result: CheckResult) -> None:
filename = self.env.doc2path(result.docname, False)

linkstat: dict[str, str | int] = {
linkstat: dict[str, str | int | _Statuses] = {
'filename': str(filename),
'lineno': result.lineno,
'status': result.status,
Expand Down Expand Up @@ -182,14 +193,20 @@ def process_result(self, result: CheckResult) -> None:
result.uri + ' to ' + result.message,
)
else:
raise ValueError('Unknown status %s.' % result.status)
msg = f'Unknown status {result.status!r}.'
raise ValueError(msg)

def write_linkstat(self, data: dict[str, str | int]) -> None:
self.json_outfile.write(json.dumps(data))
self.json_outfile.write('\n')

def write_entry(
self, what: str, docname: str, filename: _StrPath, line: int, uri: str
self,
what: _Statuses | str,
docname: str,
filename: _StrPath,
line: int,
uri: str,
) -> None:
self.txt_outfile.write(f'{filename}:{line}: [{what}] {uri}\n')

Expand Down Expand Up @@ -330,7 +347,7 @@ class CheckResult(NamedTuple):
uri: str
docname: str
lineno: int
AA-Turner marked this conversation as resolved.
Show resolved Hide resolved
status: str
status: _Statuses | Literal['']
message: str
code: int

Expand Down Expand Up @@ -373,6 +390,7 @@ def __init__(
self.retries: int = config.linkcheck_retries
self.rate_limit_timeout = config.linkcheck_rate_limit_timeout
self._allow_unauthorized = config.linkcheck_allow_unauthorized
self._timeout_status: Literal['broken', 'timeout']
jayaddison marked this conversation as resolved.
Show resolved Hide resolved
if config.linkcheck_report_timeouts_as_broken:
self._timeout_status = 'broken'
else:
Expand Down Expand Up @@ -423,7 +441,7 @@ def run(self) -> None:

def _check(
self, docname: str, uri: str, hyperlink: Hyperlink
) -> tuple[str, str, int]:
) -> tuple[_Statuses | Literal[''], str, int]:
# check for various conditions without bothering the network

for doc_matcher in self.documents_exclude:
Expand Down Expand Up @@ -451,7 +469,7 @@ def _check(
for _ in range(self.retries):
status, info, code = self._check_uri(uri, hyperlink)
if status != 'broken':
break
return status, info, code

return status, info, code

Expand All @@ -464,7 +482,7 @@ def _retrieval_methods(
yield self._session.head, {'allow_redirects': True}
yield self._session.get, {'stream': True}

def _check_uri(self, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:
def _check_uri(self, uri: str, hyperlink: Hyperlink) -> tuple[_Statuses, str, int]:
req_url, delimiter, anchor = uri.partition('#')
if delimiter and anchor:
for rex in self.anchors_ignore:
Expand Down Expand Up @@ -556,8 +574,10 @@ def _check_uri(self, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:

# Unauthorized: the client did not provide required credentials
if status_code == 401:
status = 'working' if self._allow_unauthorized else 'broken'
return status, 'unauthorized', 0
if self._allow_unauthorized:
return 'working', 'unauthorized', 0
else:
return 'broken', 'unauthorized', 0

# Rate limiting; back-off if allowed, or report failure otherwise
if status_code == 429:
Expand Down
Loading