summaryrefslogtreecommitdiff
path: root/src/certbot_dns_alibabacloud
diff options
context:
space:
mode:
Diffstat (limited to 'src/certbot_dns_alibabacloud')
-rw-r--r--src/certbot_dns_alibabacloud/__init__.py1
-rw-r--r--src/certbot_dns_alibabacloud/_internal/__init__.py1
-rw-r--r--src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py263
-rw-r--r--src/certbot_dns_alibabacloud/_internal/tests/__init__.py1
-rw-r--r--src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py204
5 files changed, 470 insertions, 0 deletions
diff --git a/src/certbot_dns_alibabacloud/__init__.py b/src/certbot_dns_alibabacloud/__init__.py
new file mode 100644
index 0000000..b673957
--- /dev/null
+++ b/src/certbot_dns_alibabacloud/__init__.py
@@ -0,0 +1 @@
+"""Alibaba Cloud DNS Authenticator plugin for Certbot."""
diff --git a/src/certbot_dns_alibabacloud/_internal/__init__.py b/src/certbot_dns_alibabacloud/_internal/__init__.py
new file mode 100644
index 0000000..e353931
--- /dev/null
+++ b/src/certbot_dns_alibabacloud/_internal/__init__.py
@@ -0,0 +1 @@
+"""Internal implementation of `~certbot_dns_alibabacloud.dns_alibabacloud` plugin."""
diff --git a/src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py b/src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py
new file mode 100644
index 0000000..7a43db4
--- /dev/null
+++ b/src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py
@@ -0,0 +1,263 @@
+"""DNS Authenticator for Alibaba Cloud."""
+
+import logging
+from typing import Any, Callable, Optional
+
+from alibabacloud_alidns20150109 import models as alidns_models
+from alibabacloud_alidns20150109.client import Client as AlidnsClient
+from alibabacloud_alidns20150109.models import DescribeDomainsResponseBodyDomainsDomain as Domain
+from alibabacloud_credentials.client import Client as CredentialClient
+from alibabacloud_credentials.models import Config as CredentialConfig
+from alibabacloud_tea_openapi import models as openapi_models
+from certbot import errors
+from certbot.plugins import dns_common
+from certbot.plugins.dns_common import CredentialsConfiguration
+from Tea.exceptions import TeaException
+
+logger = logging.getLogger(__name__)
+
+
+class Authenticator(dns_common.DNSAuthenticator):
+ """
+ DNS Authenticator for Alibaba Cloud.
+
+ This Authenticator uses the Alibaba Cloud API to fulfill a dns-01 challenge.
+ """
+
+ description = "Obtain certificates using a DNS TXT record (if you are using Alibaba Cloud DNS)."
+ ttl = 600
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+ self.credentials: Optional[CredentialsConfiguration] = None
+
+ @classmethod
+ def add_parser_arguments(
+ cls, add: Callable[..., None], default_propagation_seconds: int = 30
+ ) -> None:
+ super().add_parser_arguments(add, default_propagation_seconds)
+ add("credentials", help="Alibaba Cloud credentials INI file.")
+
+ def more_info(self) -> str:
+ return (
+ "This plugin configures a DNS TXT record to respond to "
+ "a dns-01 challenge using the Alibaba Cloud API."
+ )
+
+ def _setup_credentials(self) -> None:
+ self.credentials = self._configure_credentials(
+ "credentials",
+ "Alibaba Cloud credentials INI file",
+ {
+ "access_key_id": "Access Key ID for Alibaba Cloud API",
+ "access_key_secret": "Access Key Secret for Alibaba Cloud API",
+ },
+ )
+
+ def _perform(self, domain: str, validation_name: str, validation: str) -> None:
+ self._get_alibabacloud_client().add_txt_record(
+ domain, validation_name, validation, self.ttl
+ )
+
+ def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
+ self._get_alibabacloud_client().del_txt_record(domain, validation_name, validation)
+
+ def _get_alibabacloud_client(self) -> "_AlibabaCloudClient":
+ if not self.credentials:
+ raise errors.Error("Plugin has not been prepared.")
+ return _AlibabaCloudClient(
+ str(self.credentials.conf("access_key_id")),
+ str(self.credentials.conf("access_key_secret")),
+ )
+
+
+class _AlibabaCloudClient:
+ """Encapsulates all communication with the Alibaba Cloud API."""
+
+ def __init__(self, access_key_id: str, access_key_secret: str) -> None:
+ config = openapi_models.Config(
+ credential=CredentialClient(
+ CredentialConfig(
+ type="access_key",
+ access_key_id=access_key_id,
+ access_key_secret=access_key_secret,
+ )
+ ),
+ endpoint="alidns.aliyuncs.com",
+ )
+ self.client = AlidnsClient(config)
+
+ def add_txt_record(
+ self, domain: str, record_name: str, record_content: str, record_ttl: int
+ ) -> None:
+ """
+ Add a TXT record using supplied information.
+
+ :param str domain: The domain associated with the record.
+ :param str record_name: The record name (typically beginning with '_acme-challenge.').
+ :param str record_content: The record content (typically the challenge validation).
+ :param int record_ttl: The record TTL (number of seconds that the record may be cached).
+ :raises certbot.errors.PluginError: If an error occurs while communicating with the Alibaba Cloud API.
+ """
+
+ domain_name = self._find_domain_name(domain)
+ rr = self._compute_rr(domain_name, record_name)
+
+ request = alidns_models.AddDomainRecordRequest(
+ domain_name=domain_name,
+ rr=rr,
+ type="TXT",
+ value=record_content,
+ ttl=record_ttl,
+ )
+
+ try:
+ logger.debug("Attempting to add record to domain %s: %s", domain_name, request)
+ response = self.client.add_domain_record(request)
+ except TeaException as e:
+ logger.error("Encountered Alibaba Cloud API error while adding TXT record: %s", e)
+ raise errors.PluginError("Error communicating with the Alibaba Cloud API: {0}".format(e))
+
+ logger.debug("Successfully added TXT record with record_id: %s", response.body.record_id)
+
+ def del_txt_record(self, domain: str, record_name: str, record_content: str) -> None:
+ """
+ Delete a TXT record using the supplied information.
+
+ Note that both the record's name and content are used to ensure that similar records
+ created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted.
+
+ Failures are logged, but not raised.
+
+ :param str domain: The domain associated with the record.
+ :param str record_name: The record name (typically beginning with '_acme-challenge.').
+ :param str record_content: The record content (typically the challenge validation).
+ """
+
+ try:
+ domain_name = self._find_domain_name(domain)
+ except errors.PluginError as e:
+ logger.debug("Encountered error finding domain name during deletion: %s", e)
+ logger.debug("Domain not found; no cleanup needed.")
+ return
+
+ record_id = self._find_txt_record_id(domain_name, record_name, record_content)
+ if record_id:
+ request = alidns_models.DeleteDomainRecordRequest(record_id=record_id)
+ try:
+ self.client.delete_domain_record(request)
+ logger.debug("Successfully deleted TXT record.")
+ except TeaException as e:
+ logger.warning(
+ "Encountered Alibaba Cloud API error while deleting TXT record: %s", e
+ )
+ else:
+ logger.debug("TXT record not found; no cleanup needed.")
+
+ def _find_domain_name(self, domain: str) -> str:
+ """
+ Find the domain name registered on Alibaba Cloud.
+
+ :param str domain: The domain for which to find the corresponding registered domain.
+ :returns: The domain name registered on Alibaba Cloud.
+ :rtype: str
+ :raises certbot.errors.PluginError: If domain could not be matched.
+ """
+
+ domain_name_guesses = dns_common.base_domain_name_guesses(domain)
+ matched_domain: Optional[Domain] = None
+ code = msg = None
+
+ for guess in domain_name_guesses:
+ request = alidns_models.DescribeDomainsRequest(
+ key_word=guess,
+ search_mode="EXACT",
+ )
+
+ try:
+ response = self.client.describe_domains(request)
+ matched_domain = next(iter(response.body.domains.domain), None)
+ except TeaException as e:
+ code = e.code
+ msg = str(e)
+ hint = None
+
+ if code == "InvalidAccessKeyId.NotFound" or code == "SignatureDoesNotMatch":
+ hint = "Did you enter valid Alibaba Cloud API credentials?"
+ elif code == "InvalidAccessKeyId.Inactive":
+ hint = "Did you deactivate the Alibaba Cloud API credentials?"
+
+ if hint:
+ raise errors.PluginError(
+ "Error determining registered domain name: {0} {1} Please confirm "
+ "that you have supplied valid Alibaba Cloud API credentials. ({2})"
+ .format(code, msg, hint)
+ )
+ else:
+ logger.debug(
+ "Unrecognized Alibaba Cloud API error while finding domain name: %s. "
+ "Continuing with next guess...",
+ e,
+ )
+
+ if matched_domain:
+ if matched_domain.domain_name == domain:
+ logger.debug(
+ "Found registered domain %s for %s", matched_domain.domain_name, domain
+ )
+ return matched_domain.domain_name
+ break # Domain mismatch on exact search; safely aborting
+
+ if msg is not None:
+ raise errors.PluginError(
+ "Unable to determine registered domain name for {0} using names: "
+ "{1}. The error from Alibaba Cloud was {2} {3}."
+ .format(domain, domain_name_guesses, code, msg)
+ )
+
+ raise errors.PluginError(
+ "Unable to determine registered domain name for {0} using names: {1}. "
+ "Please confirm that the domain name has been entered correctly "
+ "and is already associated with the supplied Alibaba Cloud account."
+ .format(domain, domain_name_guesses)
+ )
+
+ def _find_txt_record_id(
+ self, domain_name: str, record_name: str, record_content: str
+ ) -> Optional[str]:
+ """
+ Find the record_id for a TXT record with the given name and content.
+
+ :param str domain_name: The domain name registered on Alibaba Cloud.
+ :param str record_name: The record name (typically beginning with '_acme-challenge.').
+ :param str record_content: The record content (typically the challenge validation).
+ :returns: The record_id, if found.
+ :rtype: str
+ """
+
+ rr = self._compute_rr(domain_name, record_name)
+
+ # Combination search mode requires an exact match for the RR, value, and record type.
+ request = alidns_models.DescribeDomainRecordsRequest(
+ domain_name=domain_name,
+ search_mode="COMBINATION",
+ rrkey_word=rr,
+ type="TXT",
+ value_key_word=record_content,
+ )
+
+ try:
+ response = self.client.describe_domain_records(request)
+ except TeaException as e:
+ logger.debug("Encountered Alibaba Cloud API error while getting TXT record_id: %s", e)
+ return None
+
+ records = response.body.domain_records.record
+ if records:
+ return records[0].record_id
+ logger.debug("Unable to find TXT record.")
+ return None
+
+ @staticmethod
+ def _compute_rr(domain: str, full_record_name: str) -> str:
+ return full_record_name.rpartition("." + domain)[0]
diff --git a/src/certbot_dns_alibabacloud/_internal/tests/__init__.py b/src/certbot_dns_alibabacloud/_internal/tests/__init__.py
new file mode 100644
index 0000000..d6c769f
--- /dev/null
+++ b/src/certbot_dns_alibabacloud/_internal/tests/__init__.py
@@ -0,0 +1 @@
+"""certbot-dns-alibabacloud tests"""
diff --git a/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py b/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py
new file mode 100644
index 0000000..922547a
--- /dev/null
+++ b/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py
@@ -0,0 +1,204 @@
+"""Tests for certbot_dns_alibabacloud._internal.dns_alibabacloud."""
+
+import sys
+import unittest
+from unittest import mock
+
+import pytest
+from certbot import errors
+from certbot.compat import os
+from certbot.plugins import dns_test_common
+from certbot.plugins.dns_test_common import DOMAIN
+from certbot.tests import util as test_util
+from Tea.exceptions import TeaException
+
+
+def _make_api_error(code: str) -> TeaException:
+ return TeaException({"code": code, "message": ""})
+
+
+API_ERROR = _make_api_error("InternalError")
+
+ACCESS_KEY_ID = "LTAI5txxxxxxxxxxxxxxxxxx"
+ACCESS_KEY_SECRET = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyy"
+
+
+class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest):
+ def setUp(self) -> None:
+ from certbot_dns_alibabacloud._internal.dns_alibabacloud import Authenticator
+
+ super().setUp()
+
+ path = os.path.join(self.tempdir, "file.ini")
+ dns_test_common.write(
+ {
+ "alibabacloud_access_key_id": ACCESS_KEY_ID,
+ "alibabacloud_access_key_secret": ACCESS_KEY_SECRET,
+ },
+ path,
+ )
+
+ self.config = mock.MagicMock(
+ alibabacloud_credentials=path,
+ alibabacloud_propagation_seconds=0, # don't wait during tests
+ )
+
+ self.auth = Authenticator(self.config, "alibabacloud")
+ self.mock_client = mock.MagicMock()
+ setattr(
+ self.auth, "_get_alibabacloud_client", mock.MagicMock(return_value=self.mock_client)
+ )
+
+ @test_util.patch_display_util()
+ def test_perform(self, unused_mock_get_utility) -> None:
+ self.auth.perform([self.achall])
+
+ expected = [
+ mock.call.add_txt_record(DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY, mock.ANY)
+ ]
+ assert expected == self.mock_client.mock_calls
+
+ @test_util.patch_display_util()
+ def test_cleanup(self, unused_mock_get_utility) -> None:
+ self.auth._attempt_cleanup = True
+ self.auth.cleanup([self.achall])
+
+ expected = [mock.call.del_txt_record(DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY)]
+ assert expected == self.mock_client.mock_calls
+
+ def test_no_creds(self) -> None:
+ dns_test_common.write({}, self.config.alibabacloud_credentials)
+ with pytest.raises(errors.PluginError):
+ self.auth.perform([self.achall])
+
+
+def _mock_domain_response(domain: str | None) -> mock.MagicMock:
+ """Create a mock response for the DescribeDomains API."""
+ response = mock.MagicMock()
+ response.body.domains.domain = [mock.MagicMock(domain_name=domain)]
+ return response
+
+
+def _mock_record_response(record_id: str | None) -> mock.MagicMock:
+ """Create a mock response for the DescribeDomainRecords API."""
+ response = mock.MagicMock()
+ response.body.domain_records.record = [mock.MagicMock(record_id=record_id)]
+ return response
+
+
+class AlibabaCloudClientTest(unittest.TestCase):
+ record_prefix = "_acme-challenge"
+ record_name = record_prefix + "." + DOMAIN
+ record_content = "foo"
+ record_ttl = 600
+ domain_name = DOMAIN
+ record_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+
+ def setUp(self) -> None:
+ from certbot_dns_alibabacloud._internal.dns_alibabacloud import _AlibabaCloudClient
+
+ self.ac = _AlibabaCloudClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET)
+ self.client = mock.MagicMock()
+ self.ac.client = self.client
+
+ def test_add_txt_record(self) -> None:
+ self.client.describe_domains.return_value = _mock_domain_response(DOMAIN)
+
+ self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
+
+ called_request = self.client.add_domain_record.call_args.args[0]
+ assert called_request.domain_name == DOMAIN
+ assert called_request.rr == self.record_prefix
+ assert called_request.type == "TXT"
+ assert called_request.value == self.record_content
+ assert called_request.ttl == self.record_ttl
+
+ def test_add_txt_record_error(self) -> None:
+ self.client.describe_domains.return_value = _mock_domain_response(DOMAIN)
+ self.client.add_domain_record.side_effect = API_ERROR
+
+ with pytest.raises(errors.PluginError):
+ self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
+
+ def test_add_txt_record_error_during_domain_lookup(self) -> None:
+ self.client.describe_domains.side_effect = API_ERROR
+
+ with pytest.raises(errors.PluginError):
+ self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
+
+ def test_add_txt_record_domain_not_found(self) -> None:
+ self.client.describe_domains.return_value = _mock_domain_response(None)
+
+ with pytest.raises(errors.PluginError):
+ self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
+
+ def test_add_txt_record_bad_creds(self) -> None:
+ self.client.describe_domains.side_effect = _make_api_error("InvalidAccessKeyId.NotFound")
+ with pytest.raises(errors.PluginError):
+ self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
+
+ self.client.describe_domains.side_effect = _make_api_error("SignatureDoesNotMatch")
+ with pytest.raises(errors.PluginError):
+ self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
+
+ self.client.describe_domains.side_effect = _make_api_error("InvalidAccessKeyId.Inactive")
+ with pytest.raises(errors.PluginError):
+ self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
+
+ def test_del_txt_record(self) -> None:
+ self.client.describe_domains.return_value = _mock_domain_response(DOMAIN)
+ self.client.describe_domain_records.return_value = _mock_record_response(self.record_id)
+
+ self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content)
+
+ self.client.describe_domains.assert_called_once()
+ self.client.describe_domain_records.assert_called_once()
+ self.client.delete_domain_record.assert_called_once()
+
+ called_request = self.client.describe_domain_records.call_args.args[0]
+ assert called_request.domain_name == DOMAIN
+ assert called_request.type == "TXT"
+ assert called_request.rrkey_word == self.record_prefix
+ assert called_request.value_key_word == self.record_content
+ assert called_request.search_mode == "COMBINATION"
+
+ called_request = self.client.delete_domain_record.call_args.args[0]
+ assert called_request.record_id == self.record_id
+
+ def test_del_txt_record_domain_not_found(self) -> None:
+ self.client.describe_domains.return_value = _mock_domain_response(None)
+
+ self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content)
+ self.client.describe_domains.assert_called_once()
+
+ def test_del_txt_record_error_during_domain_lookup(self) -> None:
+ self.client.describe_domains.side_effect = API_ERROR
+
+ self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content)
+
+ def test_del_txt_record_no_record(self) -> None:
+ self.client.describe_domains.return_value = _mock_domain_response(DOMAIN)
+ self.client.describe_domain_records.return_value = _mock_record_response(None)
+
+ self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content)
+ self.client.describe_domain_records.assert_called_once()
+ self.client.delete_domain_record.assert_not_called()
+
+ def test_del_txt_record_error_during_record_lookup(self) -> None:
+ self.client.describe_domains.return_value = _mock_domain_response(DOMAIN)
+ self.client.describe_domain_records.side_effect = API_ERROR
+
+ self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content)
+ self.client.describe_domain_records.assert_called_once()
+ self.client.delete_domain_record.assert_not_called()
+
+ def test_del_txt_record_error_during_deletion(self) -> None:
+ self.client.describe_domains.return_value = _mock_domain_response(DOMAIN)
+ self.client.delete_domain_record.side_effect = API_ERROR
+
+ self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content)
+ self.client.delete_domain_record.assert_called_once()
+
+
+if __name__ == "__main__":
+ sys.exit(pytest.main(sys.argv[1:] + [__file__]))