diff options
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | src/certbot_dns_alibabacloud/__init__.py | 1 | ||||
| -rw-r--r-- | src/certbot_dns_alibabacloud/_internal/__init__.py | 1 | ||||
| -rw-r--r-- | src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py | 263 | ||||
| -rw-r--r-- | src/certbot_dns_alibabacloud/_internal/tests/__init__.py | 1 | ||||
| -rw-r--r-- | src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py | 204 |
6 files changed, 471 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3feb78a --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.egg-info/ diff --git a/src/certbot_dns_alibabacloud/__init__.py b/src/certbot_dns_alibabacloud/__init__.py new file mode 100644 index 0000000..b673957 --- /dev/null +++ b/src/certbot_dns_alibabacloud/__init__.py @@ -0,0 +1 @@ +"""Alibaba Cloud DNS Authenticator plugin for Certbot.""" diff --git a/src/certbot_dns_alibabacloud/_internal/__init__.py b/src/certbot_dns_alibabacloud/_internal/__init__.py new file mode 100644 index 0000000..e353931 --- /dev/null +++ b/src/certbot_dns_alibabacloud/_internal/__init__.py @@ -0,0 +1 @@ +"""Internal implementation of `~certbot_dns_alibabacloud.dns_alibabacloud` plugin.""" diff --git a/src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py b/src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py new file mode 100644 index 0000000..7a43db4 --- /dev/null +++ b/src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py @@ -0,0 +1,263 @@ +"""DNS Authenticator for Alibaba Cloud.""" + +import logging +from typing import Any, Callable, Optional + +from alibabacloud_alidns20150109 import models as alidns_models +from alibabacloud_alidns20150109.client import Client as AlidnsClient +from alibabacloud_alidns20150109.models import DescribeDomainsResponseBodyDomainsDomain as Domain +from alibabacloud_credentials.client import Client as CredentialClient +from alibabacloud_credentials.models import Config as CredentialConfig +from alibabacloud_tea_openapi import models as openapi_models +from certbot import errors +from certbot.plugins import dns_common +from certbot.plugins.dns_common import CredentialsConfiguration +from Tea.exceptions import TeaException + +logger = logging.getLogger(__name__) + + +class Authenticator(dns_common.DNSAuthenticator): + """ + DNS Authenticator for Alibaba Cloud. + + This Authenticator uses the Alibaba Cloud API to fulfill a dns-01 challenge. + """ + + description = "Obtain certificates using a DNS TXT record (if you are using Alibaba Cloud DNS)." + ttl = 600 + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.credentials: Optional[CredentialsConfiguration] = None + + @classmethod + def add_parser_arguments( + cls, add: Callable[..., None], default_propagation_seconds: int = 30 + ) -> None: + super().add_parser_arguments(add, default_propagation_seconds) + add("credentials", help="Alibaba Cloud credentials INI file.") + + def more_info(self) -> str: + return ( + "This plugin configures a DNS TXT record to respond to " + "a dns-01 challenge using the Alibaba Cloud API." + ) + + def _setup_credentials(self) -> None: + self.credentials = self._configure_credentials( + "credentials", + "Alibaba Cloud credentials INI file", + { + "access_key_id": "Access Key ID for Alibaba Cloud API", + "access_key_secret": "Access Key Secret for Alibaba Cloud API", + }, + ) + + def _perform(self, domain: str, validation_name: str, validation: str) -> None: + self._get_alibabacloud_client().add_txt_record( + domain, validation_name, validation, self.ttl + ) + + def _cleanup(self, domain: str, validation_name: str, validation: str) -> None: + self._get_alibabacloud_client().del_txt_record(domain, validation_name, validation) + + def _get_alibabacloud_client(self) -> "_AlibabaCloudClient": + if not self.credentials: + raise errors.Error("Plugin has not been prepared.") + return _AlibabaCloudClient( + str(self.credentials.conf("access_key_id")), + str(self.credentials.conf("access_key_secret")), + ) + + +class _AlibabaCloudClient: + """Encapsulates all communication with the Alibaba Cloud API.""" + + def __init__(self, access_key_id: str, access_key_secret: str) -> None: + config = openapi_models.Config( + credential=CredentialClient( + CredentialConfig( + type="access_key", + access_key_id=access_key_id, + access_key_secret=access_key_secret, + ) + ), + endpoint="alidns.aliyuncs.com", + ) + self.client = AlidnsClient(config) + + def add_txt_record( + self, domain: str, record_name: str, record_content: str, record_ttl: int + ) -> None: + """ + Add a TXT record using supplied information. + + :param str domain: The domain associated with the record. + :param str record_name: The record name (typically beginning with '_acme-challenge.'). + :param str record_content: The record content (typically the challenge validation). + :param int record_ttl: The record TTL (number of seconds that the record may be cached). + :raises certbot.errors.PluginError: If an error occurs while communicating with the Alibaba Cloud API. + """ + + domain_name = self._find_domain_name(domain) + rr = self._compute_rr(domain_name, record_name) + + request = alidns_models.AddDomainRecordRequest( + domain_name=domain_name, + rr=rr, + type="TXT", + value=record_content, + ttl=record_ttl, + ) + + try: + logger.debug("Attempting to add record to domain %s: %s", domain_name, request) + response = self.client.add_domain_record(request) + except TeaException as e: + logger.error("Encountered Alibaba Cloud API error while adding TXT record: %s", e) + raise errors.PluginError("Error communicating with the Alibaba Cloud API: {0}".format(e)) + + logger.debug("Successfully added TXT record with record_id: %s", response.body.record_id) + + def del_txt_record(self, domain: str, record_name: str, record_content: str) -> None: + """ + Delete a TXT record using the supplied information. + + Note that both the record's name and content are used to ensure that similar records + created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted. + + Failures are logged, but not raised. + + :param str domain: The domain associated with the record. + :param str record_name: The record name (typically beginning with '_acme-challenge.'). + :param str record_content: The record content (typically the challenge validation). + """ + + try: + domain_name = self._find_domain_name(domain) + except errors.PluginError as e: + logger.debug("Encountered error finding domain name during deletion: %s", e) + logger.debug("Domain not found; no cleanup needed.") + return + + record_id = self._find_txt_record_id(domain_name, record_name, record_content) + if record_id: + request = alidns_models.DeleteDomainRecordRequest(record_id=record_id) + try: + self.client.delete_domain_record(request) + logger.debug("Successfully deleted TXT record.") + except TeaException as e: + logger.warning( + "Encountered Alibaba Cloud API error while deleting TXT record: %s", e + ) + else: + logger.debug("TXT record not found; no cleanup needed.") + + def _find_domain_name(self, domain: str) -> str: + """ + Find the domain name registered on Alibaba Cloud. + + :param str domain: The domain for which to find the corresponding registered domain. + :returns: The domain name registered on Alibaba Cloud. + :rtype: str + :raises certbot.errors.PluginError: If domain could not be matched. + """ + + domain_name_guesses = dns_common.base_domain_name_guesses(domain) + matched_domain: Optional[Domain] = None + code = msg = None + + for guess in domain_name_guesses: + request = alidns_models.DescribeDomainsRequest( + key_word=guess, + search_mode="EXACT", + ) + + try: + response = self.client.describe_domains(request) + matched_domain = next(iter(response.body.domains.domain), None) + except TeaException as e: + code = e.code + msg = str(e) + hint = None + + if code == "InvalidAccessKeyId.NotFound" or code == "SignatureDoesNotMatch": + hint = "Did you enter valid Alibaba Cloud API credentials?" + elif code == "InvalidAccessKeyId.Inactive": + hint = "Did you deactivate the Alibaba Cloud API credentials?" + + if hint: + raise errors.PluginError( + "Error determining registered domain name: {0} {1} Please confirm " + "that you have supplied valid Alibaba Cloud API credentials. ({2})" + .format(code, msg, hint) + ) + else: + logger.debug( + "Unrecognized Alibaba Cloud API error while finding domain name: %s. " + "Continuing with next guess...", + e, + ) + + if matched_domain: + if matched_domain.domain_name == domain: + logger.debug( + "Found registered domain %s for %s", matched_domain.domain_name, domain + ) + return matched_domain.domain_name + break # Domain mismatch on exact search; safely aborting + + if msg is not None: + raise errors.PluginError( + "Unable to determine registered domain name for {0} using names: " + "{1}. The error from Alibaba Cloud was {2} {3}." + .format(domain, domain_name_guesses, code, msg) + ) + + raise errors.PluginError( + "Unable to determine registered domain name for {0} using names: {1}. " + "Please confirm that the domain name has been entered correctly " + "and is already associated with the supplied Alibaba Cloud account." + .format(domain, domain_name_guesses) + ) + + def _find_txt_record_id( + self, domain_name: str, record_name: str, record_content: str + ) -> Optional[str]: + """ + Find the record_id for a TXT record with the given name and content. + + :param str domain_name: The domain name registered on Alibaba Cloud. + :param str record_name: The record name (typically beginning with '_acme-challenge.'). + :param str record_content: The record content (typically the challenge validation). + :returns: The record_id, if found. + :rtype: str + """ + + rr = self._compute_rr(domain_name, record_name) + + # Combination search mode requires an exact match for the RR, value, and record type. + request = alidns_models.DescribeDomainRecordsRequest( + domain_name=domain_name, + search_mode="COMBINATION", + rrkey_word=rr, + type="TXT", + value_key_word=record_content, + ) + + try: + response = self.client.describe_domain_records(request) + except TeaException as e: + logger.debug("Encountered Alibaba Cloud API error while getting TXT record_id: %s", e) + return None + + records = response.body.domain_records.record + if records: + return records[0].record_id + logger.debug("Unable to find TXT record.") + return None + + @staticmethod + def _compute_rr(domain: str, full_record_name: str) -> str: + return full_record_name.rpartition("." + domain)[0] diff --git a/src/certbot_dns_alibabacloud/_internal/tests/__init__.py b/src/certbot_dns_alibabacloud/_internal/tests/__init__.py new file mode 100644 index 0000000..d6c769f --- /dev/null +++ b/src/certbot_dns_alibabacloud/_internal/tests/__init__.py @@ -0,0 +1 @@ +"""certbot-dns-alibabacloud tests""" diff --git a/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py b/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py new file mode 100644 index 0000000..922547a --- /dev/null +++ b/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py @@ -0,0 +1,204 @@ +"""Tests for certbot_dns_alibabacloud._internal.dns_alibabacloud.""" + +import sys +import unittest +from unittest import mock + +import pytest +from certbot import errors +from certbot.compat import os +from certbot.plugins import dns_test_common +from certbot.plugins.dns_test_common import DOMAIN +from certbot.tests import util as test_util +from Tea.exceptions import TeaException + + +def _make_api_error(code: str) -> TeaException: + return TeaException({"code": code, "message": ""}) + + +API_ERROR = _make_api_error("InternalError") + +ACCESS_KEY_ID = "LTAI5txxxxxxxxxxxxxxxxxx" +ACCESS_KEY_SECRET = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyy" + + +class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest): + def setUp(self) -> None: + from certbot_dns_alibabacloud._internal.dns_alibabacloud import Authenticator + + super().setUp() + + path = os.path.join(self.tempdir, "file.ini") + dns_test_common.write( + { + "alibabacloud_access_key_id": ACCESS_KEY_ID, + "alibabacloud_access_key_secret": ACCESS_KEY_SECRET, + }, + path, + ) + + self.config = mock.MagicMock( + alibabacloud_credentials=path, + alibabacloud_propagation_seconds=0, # don't wait during tests + ) + + self.auth = Authenticator(self.config, "alibabacloud") + self.mock_client = mock.MagicMock() + setattr( + self.auth, "_get_alibabacloud_client", mock.MagicMock(return_value=self.mock_client) + ) + + @test_util.patch_display_util() + def test_perform(self, unused_mock_get_utility) -> None: + self.auth.perform([self.achall]) + + expected = [ + mock.call.add_txt_record(DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY, mock.ANY) + ] + assert expected == self.mock_client.mock_calls + + @test_util.patch_display_util() + def test_cleanup(self, unused_mock_get_utility) -> None: + self.auth._attempt_cleanup = True + self.auth.cleanup([self.achall]) + + expected = [mock.call.del_txt_record(DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY)] + assert expected == self.mock_client.mock_calls + + def test_no_creds(self) -> None: + dns_test_common.write({}, self.config.alibabacloud_credentials) + with pytest.raises(errors.PluginError): + self.auth.perform([self.achall]) + + +def _mock_domain_response(domain: str | None) -> mock.MagicMock: + """Create a mock response for the DescribeDomains API.""" + response = mock.MagicMock() + response.body.domains.domain = [mock.MagicMock(domain_name=domain)] + return response + + +def _mock_record_response(record_id: str | None) -> mock.MagicMock: + """Create a mock response for the DescribeDomainRecords API.""" + response = mock.MagicMock() + response.body.domain_records.record = [mock.MagicMock(record_id=record_id)] + return response + + +class AlibabaCloudClientTest(unittest.TestCase): + record_prefix = "_acme-challenge" + record_name = record_prefix + "." + DOMAIN + record_content = "foo" + record_ttl = 600 + domain_name = DOMAIN + record_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + def setUp(self) -> None: + from certbot_dns_alibabacloud._internal.dns_alibabacloud import _AlibabaCloudClient + + self.ac = _AlibabaCloudClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET) + self.client = mock.MagicMock() + self.ac.client = self.client + + def test_add_txt_record(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + called_request = self.client.add_domain_record.call_args.args[0] + assert called_request.domain_name == DOMAIN + assert called_request.rr == self.record_prefix + assert called_request.type == "TXT" + assert called_request.value == self.record_content + assert called_request.ttl == self.record_ttl + + def test_add_txt_record_error(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.add_domain_record.side_effect = API_ERROR + + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + def test_add_txt_record_error_during_domain_lookup(self) -> None: + self.client.describe_domains.side_effect = API_ERROR + + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + def test_add_txt_record_domain_not_found(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(None) + + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + def test_add_txt_record_bad_creds(self) -> None: + self.client.describe_domains.side_effect = _make_api_error("InvalidAccessKeyId.NotFound") + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + self.client.describe_domains.side_effect = _make_api_error("SignatureDoesNotMatch") + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + self.client.describe_domains.side_effect = _make_api_error("InvalidAccessKeyId.Inactive") + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + def test_del_txt_record(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.describe_domain_records.return_value = _mock_record_response(self.record_id) + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + + self.client.describe_domains.assert_called_once() + self.client.describe_domain_records.assert_called_once() + self.client.delete_domain_record.assert_called_once() + + called_request = self.client.describe_domain_records.call_args.args[0] + assert called_request.domain_name == DOMAIN + assert called_request.type == "TXT" + assert called_request.rrkey_word == self.record_prefix + assert called_request.value_key_word == self.record_content + assert called_request.search_mode == "COMBINATION" + + called_request = self.client.delete_domain_record.call_args.args[0] + assert called_request.record_id == self.record_id + + def test_del_txt_record_domain_not_found(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(None) + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + self.client.describe_domains.assert_called_once() + + def test_del_txt_record_error_during_domain_lookup(self) -> None: + self.client.describe_domains.side_effect = API_ERROR + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + + def test_del_txt_record_no_record(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.describe_domain_records.return_value = _mock_record_response(None) + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + self.client.describe_domain_records.assert_called_once() + self.client.delete_domain_record.assert_not_called() + + def test_del_txt_record_error_during_record_lookup(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.describe_domain_records.side_effect = API_ERROR + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + self.client.describe_domain_records.assert_called_once() + self.client.delete_domain_record.assert_not_called() + + def test_del_txt_record_error_during_deletion(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.delete_domain_record.side_effect = API_ERROR + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + self.client.delete_domain_record.assert_called_once() + + +if __name__ == "__main__": + sys.exit(pytest.main(sys.argv[1:] + [__file__])) |
