"""DNS Authenticator for Alibaba Cloud.""" import logging from typing import Any, Callable, Optional from alibabacloud_alidns20150109 import models as alidns_models from alibabacloud_alidns20150109.client import Client as AlidnsClient from alibabacloud_alidns20150109.models import DescribeDomainsResponseBodyDomainsDomain as Domain from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_credentials.models import Config as CredentialConfig from alibabacloud_tea_openapi import models as openapi_models from certbot import errors from certbot.plugins import dns_common from certbot.plugins.dns_common import CredentialsConfiguration from Tea.exceptions import TeaException logger = logging.getLogger(__name__) class Authenticator(dns_common.DNSAuthenticator): """ DNS Authenticator for Alibaba Cloud. This Authenticator uses the Alibaba Cloud API to fulfill a dns-01 challenge. """ description = "Obtain certificates using a DNS TXT record (if you are using Alibaba Cloud DNS)." ttl = 600 def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.credentials: Optional[CredentialsConfiguration] = None @classmethod def add_parser_arguments( cls, add: Callable[..., None], default_propagation_seconds: int = 30 ) -> None: super().add_parser_arguments(add, default_propagation_seconds) add("credentials", help="Alibaba Cloud credentials INI file.") def more_info(self) -> str: return ( "This plugin configures a DNS TXT record to respond to " "a dns-01 challenge using the Alibaba Cloud API." ) def _setup_credentials(self) -> None: self.credentials = self._configure_credentials( "credentials", "Alibaba Cloud credentials INI file", { "access_key_id": "Access Key ID for Alibaba Cloud API", "access_key_secret": "Access Key Secret for Alibaba Cloud API", }, ) def _perform(self, domain: str, validation_name: str, validation: str) -> None: self._get_alibabacloud_client().add_txt_record( domain, validation_name, validation, self.ttl ) def _cleanup(self, domain: str, validation_name: str, validation: str) -> None: self._get_alibabacloud_client().del_txt_record(domain, validation_name, validation) def _get_alibabacloud_client(self) -> "_AlibabaCloudClient": if not self.credentials: raise errors.Error("Plugin has not been prepared.") return _AlibabaCloudClient( str(self.credentials.conf("access_key_id")), str(self.credentials.conf("access_key_secret")), ) class _AlibabaCloudClient: """Encapsulates all communication with the Alibaba Cloud API.""" def __init__(self, access_key_id: str, access_key_secret: str) -> None: config = openapi_models.Config( credential=CredentialClient( CredentialConfig( type="access_key", access_key_id=access_key_id, access_key_secret=access_key_secret, ) ), endpoint="alidns.aliyuncs.com", ) self.client = AlidnsClient(config) def add_txt_record( self, domain: str, record_name: str, record_content: str, record_ttl: int ) -> None: """ Add a TXT record using supplied information. :param str domain: The domain associated with the record. :param str record_name: The record name (typically beginning with '_acme-challenge.'). :param str record_content: The record content (typically the challenge validation). :param int record_ttl: The record TTL (number of seconds that the record may be cached). :raises certbot.errors.PluginError: If an error occurs while communicating with the Alibaba Cloud API. """ domain_name = self._find_domain_name(domain) rr = self._compute_rr(domain_name, record_name) request = alidns_models.AddDomainRecordRequest( domain_name=domain_name, rr=rr, type="TXT", value=record_content, ttl=record_ttl, ) try: logger.debug("Attempting to add record to domain %s: %s", domain_name, request) response = self.client.add_domain_record(request) except TeaException as e: logger.error("Encountered Alibaba Cloud API error while adding TXT record: %s", e) raise errors.PluginError("Error communicating with the Alibaba Cloud API: {0}".format(e)) logger.debug("Successfully added TXT record with record_id: %s", response.body.record_id) def del_txt_record(self, domain: str, record_name: str, record_content: str) -> None: """ Delete a TXT record using the supplied information. Note that both the record's name and content are used to ensure that similar records created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted. Failures are logged, but not raised. :param str domain: The domain associated with the record. :param str record_name: The record name (typically beginning with '_acme-challenge.'). :param str record_content: The record content (typically the challenge validation). """ try: domain_name = self._find_domain_name(domain) except errors.PluginError as e: logger.debug("Encountered error finding domain name during deletion: %s", e) logger.debug("Domain not found; no cleanup needed.") return record_id = self._find_txt_record_id(domain_name, record_name, record_content) if record_id: request = alidns_models.DeleteDomainRecordRequest(record_id=record_id) try: self.client.delete_domain_record(request) logger.debug("Successfully deleted TXT record.") except TeaException as e: logger.warning( "Encountered Alibaba Cloud API error while deleting TXT record: %s", e ) else: logger.debug("TXT record not found; no cleanup needed.") def _find_domain_name(self, domain: str) -> str: """ Find the domain name registered on Alibaba Cloud. :param str domain: The domain for which to find the corresponding registered domain. :returns: The domain name registered on Alibaba Cloud. :rtype: str :raises certbot.errors.PluginError: If domain could not be matched. """ domain_name_guesses = dns_common.base_domain_name_guesses(domain) matched_domain: Optional[Domain] = None code = msg = None for guess in domain_name_guesses: request = alidns_models.DescribeDomainsRequest( key_word=guess, search_mode="EXACT", ) try: response = self.client.describe_domains(request) matched_domain = next(iter(response.body.domains.domain), None) except TeaException as e: code = e.code msg = str(e) hint = None if code == "InvalidAccessKeyId.NotFound" or code == "SignatureDoesNotMatch": hint = "Did you enter valid Alibaba Cloud API credentials?" elif code == "InvalidAccessKeyId.Inactive": hint = "Did you deactivate the Alibaba Cloud API credentials?" if hint: raise errors.PluginError( "Error determining registered domain name: {0} {1} Please confirm " "that you have supplied valid Alibaba Cloud API credentials. ({2})" .format(code, msg, hint) ) else: logger.debug( "Unrecognized Alibaba Cloud API error while finding domain name: %s. " "Continuing with next guess...", e, ) if matched_domain: if matched_domain.domain_name == domain: logger.debug( "Found registered domain %s for %s", matched_domain.domain_name, domain ) return matched_domain.domain_name break # Domain mismatch on exact search; safely aborting if msg is not None: raise errors.PluginError( "Unable to determine registered domain name for {0} using names: " "{1}. The error from Alibaba Cloud was {2} {3}." .format(domain, domain_name_guesses, code, msg) ) raise errors.PluginError( "Unable to determine registered domain name for {0} using names: {1}. " "Please confirm that the domain name has been entered correctly " "and is already associated with the supplied Alibaba Cloud account." .format(domain, domain_name_guesses) ) def _find_txt_record_id( self, domain_name: str, record_name: str, record_content: str ) -> Optional[str]: """ Find the record_id for a TXT record with the given name and content. :param str domain_name: The domain name registered on Alibaba Cloud. :param str record_name: The record name (typically beginning with '_acme-challenge.'). :param str record_content: The record content (typically the challenge validation). :returns: The record_id, if found. :rtype: str """ rr = self._compute_rr(domain_name, record_name) # Combination search mode requires an exact match for the RR, value, and record type. request = alidns_models.DescribeDomainRecordsRequest( domain_name=domain_name, search_mode="COMBINATION", rrkey_word=rr, type="TXT", value_key_word=record_content, ) try: response = self.client.describe_domain_records(request) except TeaException as e: logger.debug("Encountered Alibaba Cloud API error while getting TXT record_id: %s", e) return None records = response.body.domain_records.record if records: return records[0].record_id logger.debug("Unable to find TXT record.") return None @staticmethod def _compute_rr(domain: str, full_record_name: str) -> str: return full_record_name.rpartition("." + domain)[0]