-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature : AlephDNS #47
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,6 +79,7 @@ testing = | |
black | ||
isort | ||
flake8 | ||
aiodns | ||
mqtt = | ||
aiomqtt<=0.1.3 | ||
certifi | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
import aiodns | ||
import re | ||
from .conf import settings | ||
from typing import Optional | ||
from aleph.sdk.exceptions import DomainConfigurationError | ||
|
||
|
||
class AlephDNS: | ||
def __init__(self): | ||
self.resolver = aiodns.DNSResolver(servers=settings.DNS_RESOLVERS) | ||
self.fqdn_matcher = re.compile(r"https?://?") | ||
|
||
async def query(self, name: str, query_type: str): | ||
try: | ||
return await self.resolver.query(name, query_type) | ||
except Exception as e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aliel what is the exception that we are expected here ? Catching all exceptions to stdout is not a best practice. |
||
print(e) | ||
return None | ||
|
||
def url_to_domain(self, url): | ||
return self.fqdn_matcher.sub("", url).strip().strip("/") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can simplify the return self.fqdn_matcher.sub("", url).strip(" /") |
||
|
||
async def get_ipv6_address(self, url: str): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What type does this return ? |
||
domain = self.url_to_domain(url) | ||
ipv6 = [] | ||
query = await self.query(domain, "AAAA") | ||
if query: | ||
for entry in query: | ||
ipv6.append(entry.host) | ||
return ipv6 | ||
|
||
async def get_dnslink(self, url: str): | ||
domain = self.url_to_domain(url) | ||
query = await self.query(f"_dnslink.{domain}", "TXT") | ||
if query is not None and len(query) > 0: | ||
return query[0].text | ||
|
||
async def check_domain_configured(self, domain, target, owner): | ||
try: | ||
print("Check...", target) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use |
||
return await self.check_domain(domain, target, owner) | ||
except Exception as error: | ||
raise DomainConfigurationError(error) | ||
|
||
async def check_domain( | ||
self, url: str, target: str, owner: Optional[str] = None | ||
): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a docstring to explain what this is checking ? |
||
status = {"cname": False, "owner_proof": False} | ||
|
||
target = target.lower() | ||
domain = self.url_to_domain(url) | ||
|
||
dns_rules = self.get_required_dns_rules(url, target, owner) | ||
|
||
for dns_rule in dns_rules: | ||
status[dns_rule["rule_name"]] = False | ||
|
||
record_name = dns_rule["dns"]["name"] | ||
record_type = dns_rule["dns"]["type"] | ||
record_value = dns_rule["dns"]["value"] | ||
|
||
res = await self.query(record_name, record_type.upper()) | ||
|
||
if record_type == "txt": | ||
found = False | ||
|
||
for _res in res: | ||
if hasattr(_res, "text") and _res.text == record_value: | ||
found = True | ||
|
||
if found == False: | ||
raise DomainConfigurationError( | ||
(dns_rule["info"], dns_rule["on_error"], status) | ||
) | ||
Comment on lines
+67
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use a for _res in res:
if hasattr(_res, "text") and _res.text == record_value:
break
else:
raise DomainConfigurationError(
(dns_rule["info"], dns_rule["on_error"], status)
) |
||
|
||
elif res is None or not hasattr(res, record_type) or getattr(res, record_type) != record_value: | ||
raise DomainConfigurationError( | ||
(dns_rule["info"], dns_rule["on_error"], status) | ||
) | ||
|
||
status[dns_rule["rule_name"]] = True | ||
|
||
return status | ||
|
||
def get_required_dns_rules(self, url, target, owner: Optional[str] = None): | ||
domain = self.url_to_domain(url) | ||
target = target.lower() | ||
dns_rules = [] | ||
|
||
if target == "ipfs": | ||
cname_value = settings.DNS_IPFS_DOMAIN | ||
elif target == "program": | ||
cname_value = settings.DNS_PROGRAM_DOMAIN | ||
elif target == "instance": | ||
cname_value = f"{domain}.{settings.DNS_INSTANCE_DOMAIN}" | ||
|
||
# cname rule | ||
dns_rules.append({ | ||
"rule_name": "cname", | ||
"dns": { | ||
"type": "cname", | ||
"name": domain, | ||
"value": cname_value | ||
}, | ||
"info": f"Create a CNAME record for {domain} with value {cname_value}", | ||
"on_error": f"CNAME record not found: {domain}" | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to use a dataclasses instead of a dict for this data ? |
||
|
||
if target == "ipfs": | ||
# ipfs rule | ||
dns_rules.append({ | ||
"rule_name": "delegation", | ||
"dns": { | ||
"type": "cname", | ||
"name": f"_dnslink.{domain}", | ||
"value": f"_dnslink.{domain}.{settings.DNS_ROOT_DOMAIN}" | ||
}, | ||
"info": f"Create a CNAME record for _dnslink.{domain} with value _dnslink.{domain}.{settings.DNS_ROOT_DOMAIN}", | ||
"on_error": f"CNAME record not found: _dnslink.{domain}" | ||
}) | ||
|
||
if owner: | ||
# ownership rule | ||
dns_rules.append({ | ||
"rule_name": "owner_proof", | ||
"dns": { | ||
"type": "txt", | ||
"name": f"_control.{domain}", | ||
"value": owner | ||
}, | ||
"info": f"Create a TXT record for _control.{domain} with value = owner address", | ||
"on_error": f"Owner address mismatch" | ||
}) | ||
|
||
return dns_rules |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import pytest | ||
import asyncio | ||
|
||
from aleph.sdk.domain import AlephDNS | ||
from aleph.sdk.exceptions import DomainConfigurationError | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_url_to_domain(): | ||
alephdns = AlephDNS() | ||
domain = alephdns.url_to_domain("https://aleph.im") | ||
query = await alephdns.query(domain, "A") | ||
assert query is not None | ||
assert len(query) > 0 | ||
assert hasattr(query[0], "host") | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_get_ipv6_address(): | ||
alephdns = AlephDNS() | ||
url = "https://aleph.im" | ||
ipv6_address = await alephdns.get_ipv6_address(url) | ||
assert ipv6_address is not None | ||
assert len(ipv6_address) > 0 | ||
assert ":" in ipv6_address[0] | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_dnslink(): | ||
alephdns = AlephDNS() | ||
url = "https://aleph.im" | ||
dnslink = await alephdns.get_dnslink(url) | ||
assert dnslink is not None | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_configured_domain(): | ||
alephdns = AlephDNS() | ||
url = 'https://custom-domain-unit-test.aleph.sh' | ||
status = await alephdns.check_domain(url, "ipfs", "0xfakeaddress") | ||
assert type(status) is dict | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_not_configured_domain(): | ||
alephdns = AlephDNS() | ||
url = 'https://not-configured-domain.aleph.sh' | ||
with pytest.raises(DomainConfigurationError): | ||
status = await alephdns.check_domain(url, "ipfs", "0xfakeaddress") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the second
?
at the end ?