diff options
Diffstat (limited to 'src/certbot_dns_alibabacloud/_internal/tests')
| -rw-r--r-- | src/certbot_dns_alibabacloud/_internal/tests/__init__.py | 1 | ||||
| -rw-r--r-- | src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py | 204 |
2 files changed, 205 insertions, 0 deletions
diff --git a/src/certbot_dns_alibabacloud/_internal/tests/__init__.py b/src/certbot_dns_alibabacloud/_internal/tests/__init__.py new file mode 100644 index 0000000..d6c769f --- /dev/null +++ b/src/certbot_dns_alibabacloud/_internal/tests/__init__.py @@ -0,0 +1 @@ +"""certbot-dns-alibabacloud tests""" diff --git a/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py b/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py new file mode 100644 index 0000000..922547a --- /dev/null +++ b/src/certbot_dns_alibabacloud/_internal/tests/dns_alibabacloud_test.py @@ -0,0 +1,204 @@ +"""Tests for certbot_dns_alibabacloud._internal.dns_alibabacloud.""" + +import sys +import unittest +from unittest import mock + +import pytest +from certbot import errors +from certbot.compat import os +from certbot.plugins import dns_test_common +from certbot.plugins.dns_test_common import DOMAIN +from certbot.tests import util as test_util +from Tea.exceptions import TeaException + + +def _make_api_error(code: str) -> TeaException: + return TeaException({"code": code, "message": ""}) + + +API_ERROR = _make_api_error("InternalError") + +ACCESS_KEY_ID = "LTAI5txxxxxxxxxxxxxxxxxx" +ACCESS_KEY_SECRET = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyy" + + +class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest): + def setUp(self) -> None: + from certbot_dns_alibabacloud._internal.dns_alibabacloud import Authenticator + + super().setUp() + + path = os.path.join(self.tempdir, "file.ini") + dns_test_common.write( + { + "alibabacloud_access_key_id": ACCESS_KEY_ID, + "alibabacloud_access_key_secret": ACCESS_KEY_SECRET, + }, + path, + ) + + self.config = mock.MagicMock( + alibabacloud_credentials=path, + alibabacloud_propagation_seconds=0, # don't wait during tests + ) + + self.auth = Authenticator(self.config, "alibabacloud") + self.mock_client = mock.MagicMock() + setattr( + self.auth, "_get_alibabacloud_client", mock.MagicMock(return_value=self.mock_client) + ) + + @test_util.patch_display_util() + def test_perform(self, unused_mock_get_utility) -> None: + self.auth.perform([self.achall]) + + expected = [ + mock.call.add_txt_record(DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY, mock.ANY) + ] + assert expected == self.mock_client.mock_calls + + @test_util.patch_display_util() + def test_cleanup(self, unused_mock_get_utility) -> None: + self.auth._attempt_cleanup = True + self.auth.cleanup([self.achall]) + + expected = [mock.call.del_txt_record(DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY)] + assert expected == self.mock_client.mock_calls + + def test_no_creds(self) -> None: + dns_test_common.write({}, self.config.alibabacloud_credentials) + with pytest.raises(errors.PluginError): + self.auth.perform([self.achall]) + + +def _mock_domain_response(domain: str | None) -> mock.MagicMock: + """Create a mock response for the DescribeDomains API.""" + response = mock.MagicMock() + response.body.domains.domain = [mock.MagicMock(domain_name=domain)] + return response + + +def _mock_record_response(record_id: str | None) -> mock.MagicMock: + """Create a mock response for the DescribeDomainRecords API.""" + response = mock.MagicMock() + response.body.domain_records.record = [mock.MagicMock(record_id=record_id)] + return response + + +class AlibabaCloudClientTest(unittest.TestCase): + record_prefix = "_acme-challenge" + record_name = record_prefix + "." + DOMAIN + record_content = "foo" + record_ttl = 600 + domain_name = DOMAIN + record_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + def setUp(self) -> None: + from certbot_dns_alibabacloud._internal.dns_alibabacloud import _AlibabaCloudClient + + self.ac = _AlibabaCloudClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET) + self.client = mock.MagicMock() + self.ac.client = self.client + + def test_add_txt_record(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + called_request = self.client.add_domain_record.call_args.args[0] + assert called_request.domain_name == DOMAIN + assert called_request.rr == self.record_prefix + assert called_request.type == "TXT" + assert called_request.value == self.record_content + assert called_request.ttl == self.record_ttl + + def test_add_txt_record_error(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.add_domain_record.side_effect = API_ERROR + + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + def test_add_txt_record_error_during_domain_lookup(self) -> None: + self.client.describe_domains.side_effect = API_ERROR + + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + def test_add_txt_record_domain_not_found(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(None) + + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + def test_add_txt_record_bad_creds(self) -> None: + self.client.describe_domains.side_effect = _make_api_error("InvalidAccessKeyId.NotFound") + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + self.client.describe_domains.side_effect = _make_api_error("SignatureDoesNotMatch") + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + self.client.describe_domains.side_effect = _make_api_error("InvalidAccessKeyId.Inactive") + with pytest.raises(errors.PluginError): + self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + def test_del_txt_record(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.describe_domain_records.return_value = _mock_record_response(self.record_id) + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + + self.client.describe_domains.assert_called_once() + self.client.describe_domain_records.assert_called_once() + self.client.delete_domain_record.assert_called_once() + + called_request = self.client.describe_domain_records.call_args.args[0] + assert called_request.domain_name == DOMAIN + assert called_request.type == "TXT" + assert called_request.rrkey_word == self.record_prefix + assert called_request.value_key_word == self.record_content + assert called_request.search_mode == "COMBINATION" + + called_request = self.client.delete_domain_record.call_args.args[0] + assert called_request.record_id == self.record_id + + def test_del_txt_record_domain_not_found(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(None) + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + self.client.describe_domains.assert_called_once() + + def test_del_txt_record_error_during_domain_lookup(self) -> None: + self.client.describe_domains.side_effect = API_ERROR + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + + def test_del_txt_record_no_record(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.describe_domain_records.return_value = _mock_record_response(None) + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + self.client.describe_domain_records.assert_called_once() + self.client.delete_domain_record.assert_not_called() + + def test_del_txt_record_error_during_record_lookup(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.describe_domain_records.side_effect = API_ERROR + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + self.client.describe_domain_records.assert_called_once() + self.client.delete_domain_record.assert_not_called() + + def test_del_txt_record_error_during_deletion(self) -> None: + self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) + self.client.delete_domain_record.side_effect = API_ERROR + + self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) + self.client.delete_domain_record.assert_called_once() + + +if __name__ == "__main__": + sys.exit(pytest.main(sys.argv[1:] + [__file__])) |
