summaryrefslogtreecommitdiff
path: root/src/certbot_dns_alibabacloud/_internal/dns_alibabacloud.py
blob: 7a43db49673ea70f8f11253aed9901b00bc6db8d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""DNS Authenticator for Alibaba Cloud."""

import logging
from typing import Any, Callable, Optional

from alibabacloud_alidns20150109 import models as alidns_models
from alibabacloud_alidns20150109.client import Client as AlidnsClient
from alibabacloud_alidns20150109.models import DescribeDomainsResponseBodyDomainsDomain as Domain
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_credentials.models import Config as CredentialConfig
from alibabacloud_tea_openapi import models as openapi_models
from certbot import errors
from certbot.plugins import dns_common
from certbot.plugins.dns_common import CredentialsConfiguration
from Tea.exceptions import TeaException

logger = logging.getLogger(__name__)


class Authenticator(dns_common.DNSAuthenticator):
    """
    DNS Authenticator for Alibaba Cloud.

    This Authenticator uses the Alibaba Cloud API to fulfill a dns-01 challenge.
    """

    description = "Obtain certificates using a DNS TXT record (if you are using Alibaba Cloud DNS)."
    ttl = 600

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.credentials: Optional[CredentialsConfiguration] = None

    @classmethod
    def add_parser_arguments(
        cls, add: Callable[..., None], default_propagation_seconds: int = 30
    ) -> None:
        super().add_parser_arguments(add, default_propagation_seconds)
        add("credentials", help="Alibaba Cloud credentials INI file.")

    def more_info(self) -> str:
        return (
            "This plugin configures a DNS TXT record to respond to "
            "a dns-01 challenge using the Alibaba Cloud API."
        )

    def _setup_credentials(self) -> None:
        self.credentials = self._configure_credentials(
            "credentials",
            "Alibaba Cloud credentials INI file",
            {
                "access_key_id": "Access Key ID for Alibaba Cloud API",
                "access_key_secret": "Access Key Secret for Alibaba Cloud API",
            },
        )

    def _perform(self, domain: str, validation_name: str, validation: str) -> None:
        self._get_alibabacloud_client().add_txt_record(
            domain, validation_name, validation, self.ttl
        )

    def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
        self._get_alibabacloud_client().del_txt_record(domain, validation_name, validation)

    def _get_alibabacloud_client(self) -> "_AlibabaCloudClient":
        if not self.credentials:
            raise errors.Error("Plugin has not been prepared.")
        return _AlibabaCloudClient(
            str(self.credentials.conf("access_key_id")),
            str(self.credentials.conf("access_key_secret")),
        )


class _AlibabaCloudClient:
    """Encapsulates all communication with the Alibaba Cloud API."""

    def __init__(self, access_key_id: str, access_key_secret: str) -> None:
        config = openapi_models.Config(
            credential=CredentialClient(
                CredentialConfig(
                    type="access_key",
                    access_key_id=access_key_id,
                    access_key_secret=access_key_secret,
                )
            ),
            endpoint="alidns.aliyuncs.com",
        )
        self.client = AlidnsClient(config)

    def add_txt_record(
        self, domain: str, record_name: str, record_content: str, record_ttl: int
    ) -> None:
        """
        Add a TXT record using supplied information.

        :param str domain: The domain associated with the record.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :param int record_ttl: The record TTL (number of seconds that the record may be cached).
        :raises certbot.errors.PluginError: If an error occurs while communicating with the Alibaba Cloud API.
        """

        domain_name = self._find_domain_name(domain)
        rr = self._compute_rr(domain_name, record_name)

        request = alidns_models.AddDomainRecordRequest(
            domain_name=domain_name,
            rr=rr,
            type="TXT",
            value=record_content,
            ttl=record_ttl,
        )

        try:
            logger.debug("Attempting to add record to domain %s: %s", domain_name, request)
            response = self.client.add_domain_record(request)
        except TeaException as e:
            logger.error("Encountered Alibaba Cloud API error while adding TXT record: %s", e)
            raise errors.PluginError("Error communicating with the Alibaba Cloud API: {0}".format(e))

        logger.debug("Successfully added TXT record with record_id: %s", response.body.record_id)

    def del_txt_record(self, domain: str, record_name: str, record_content: str) -> None:
        """
        Delete a TXT record using the supplied information.

        Note that both the record's name and content are used to ensure that similar records
        created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted.

        Failures are logged, but not raised.

        :param str domain: The domain associated with the record.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        """

        try:
            domain_name = self._find_domain_name(domain)
        except errors.PluginError as e:
            logger.debug("Encountered error finding domain name during deletion: %s", e)
            logger.debug("Domain not found; no cleanup needed.")
            return

        record_id = self._find_txt_record_id(domain_name, record_name, record_content)
        if record_id:
            request = alidns_models.DeleteDomainRecordRequest(record_id=record_id)
            try:
                self.client.delete_domain_record(request)
                logger.debug("Successfully deleted TXT record.")
            except TeaException as e:
                logger.warning(
                    "Encountered Alibaba Cloud API error while deleting TXT record: %s", e
                )
        else:
            logger.debug("TXT record not found; no cleanup needed.")

    def _find_domain_name(self, domain: str) -> str:
        """
        Find the domain name registered on Alibaba Cloud.

        :param str domain: The domain for which to find the corresponding registered domain.
        :returns: The domain name registered on Alibaba Cloud.
        :rtype: str
        :raises certbot.errors.PluginError: If domain could not be matched.
        """

        domain_name_guesses = dns_common.base_domain_name_guesses(domain)
        matched_domain: Optional[Domain] = None
        code = msg = None

        for guess in domain_name_guesses:
            request = alidns_models.DescribeDomainsRequest(
                key_word=guess,
                search_mode="EXACT",
            )

            try:
                response = self.client.describe_domains(request)
                matched_domain = next(iter(response.body.domains.domain), None)
            except TeaException as e:
                code = e.code
                msg = str(e)
                hint = None

                if code == "InvalidAccessKeyId.NotFound" or code == "SignatureDoesNotMatch":
                    hint = "Did you enter valid Alibaba Cloud API credentials?"
                elif code == "InvalidAccessKeyId.Inactive":
                    hint = "Did you deactivate the Alibaba Cloud API credentials?"

                if hint:
                    raise errors.PluginError(
                        "Error determining registered domain name: {0} {1} Please confirm "
                        "that you have supplied valid Alibaba Cloud API credentials. ({2})"
                        .format(code, msg, hint)
                    )
                else:
                    logger.debug(
                        "Unrecognized Alibaba Cloud API error while finding domain name: %s. "
                        "Continuing with next guess...",
                        e,
                    )

            if matched_domain:
                if matched_domain.domain_name == domain:
                    logger.debug(
                        "Found registered domain %s for %s", matched_domain.domain_name, domain
                    )
                    return matched_domain.domain_name
                break  # Domain mismatch on exact search; safely aborting

        if msg is not None:
            raise errors.PluginError(
                "Unable to determine registered domain name for {0} using names: "
                "{1}. The error from Alibaba Cloud was {2} {3}."
                .format(domain, domain_name_guesses, code, msg)
            )

        raise errors.PluginError(
            "Unable to determine registered domain name for {0} using names: {1}. "
            "Please confirm that the domain name has been entered correctly "
            "and is already associated with the supplied Alibaba Cloud account."
            .format(domain, domain_name_guesses)
        )

    def _find_txt_record_id(
        self, domain_name: str, record_name: str, record_content: str
    ) -> Optional[str]:
        """
        Find the record_id for a TXT record with the given name and content.

        :param str domain_name: The domain name registered on Alibaba Cloud.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :returns: The record_id, if found.
        :rtype: str
        """

        rr = self._compute_rr(domain_name, record_name)

        # Combination search mode requires an exact match for the RR, value, and record type.
        request = alidns_models.DescribeDomainRecordsRequest(
            domain_name=domain_name,
            search_mode="COMBINATION",
            rrkey_word=rr,
            type="TXT",
            value_key_word=record_content,
        )

        try:
            response = self.client.describe_domain_records(request)
        except TeaException as e:
            logger.debug("Encountered Alibaba Cloud API error while getting TXT record_id: %s", e)
            return None

        records = response.body.domain_records.record
        if records:
            return records[0].record_id
        logger.debug("Unable to find TXT record.")
        return None

    @staticmethod
    def _compute_rr(domain: str, full_record_name: str) -> str:
        return full_record_name.rpartition("." + domain)[0]