"""Tests for certbot_dns_alibabacloud._internal.dns_alibabacloud.""" import sys import unittest from unittest import mock import pytest from certbot import errors from certbot.compat import os from certbot.plugins import dns_test_common from certbot.plugins.dns_test_common import DOMAIN from certbot.tests import util as test_util from Tea.exceptions import TeaException def _make_api_error(code: str) -> TeaException: return TeaException({"code": code, "message": ""}) API_ERROR = _make_api_error("InternalError") ACCESS_KEY_ID = "LTAI5txxxxxxxxxxxxxxxxxx" ACCESS_KEY_SECRET = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyy" class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest): def setUp(self) -> None: from certbot_dns_alibabacloud._internal.dns_alibabacloud import Authenticator super().setUp() path = os.path.join(self.tempdir, "file.ini") dns_test_common.write( { "alibabacloud_access_key_id": ACCESS_KEY_ID, "alibabacloud_access_key_secret": ACCESS_KEY_SECRET, }, path, ) self.config = mock.MagicMock( alibabacloud_credentials=path, alibabacloud_propagation_seconds=0, # don't wait during tests ) self.auth = Authenticator(self.config, "alibabacloud") self.mock_client = mock.MagicMock() setattr( self.auth, "_get_alibabacloud_client", mock.MagicMock(return_value=self.mock_client) ) @test_util.patch_display_util() def test_perform(self, unused_mock_get_utility) -> None: self.auth.perform([self.achall]) expected = [ mock.call.add_txt_record(DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY, mock.ANY) ] assert expected == self.mock_client.mock_calls @test_util.patch_display_util() def test_cleanup(self, unused_mock_get_utility) -> None: self.auth._attempt_cleanup = True self.auth.cleanup([self.achall]) expected = [mock.call.del_txt_record(DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY)] assert expected == self.mock_client.mock_calls def test_no_creds(self) -> None: dns_test_common.write({}, self.config.alibabacloud_credentials) with pytest.raises(errors.PluginError): self.auth.perform([self.achall]) def _mock_domain_response(domain: str | None) -> mock.MagicMock: """Create a mock response for the DescribeDomains API.""" response = mock.MagicMock() response.body.domains.domain = [mock.MagicMock(domain_name=domain)] return response def _mock_record_response(record_id: str | None) -> mock.MagicMock: """Create a mock response for the DescribeDomainRecords API.""" response = mock.MagicMock() response.body.domain_records.record = [mock.MagicMock(record_id=record_id)] return response class AlibabaCloudClientTest(unittest.TestCase): record_prefix = "_acme-challenge" record_name = record_prefix + "." + DOMAIN record_content = "foo" record_ttl = 600 domain_name = DOMAIN record_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" def setUp(self) -> None: from certbot_dns_alibabacloud._internal.dns_alibabacloud import _AlibabaCloudClient self.ac = _AlibabaCloudClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET) self.client = mock.MagicMock() self.ac.client = self.client def test_add_txt_record(self) -> None: self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) called_request = self.client.add_domain_record.call_args.args[0] assert called_request.domain_name == DOMAIN assert called_request.rr == self.record_prefix assert called_request.type == "TXT" assert called_request.value == self.record_content assert called_request.ttl == self.record_ttl def test_add_txt_record_error(self) -> None: self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) self.client.add_domain_record.side_effect = API_ERROR with pytest.raises(errors.PluginError): self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) def test_add_txt_record_error_during_domain_lookup(self) -> None: self.client.describe_domains.side_effect = API_ERROR with pytest.raises(errors.PluginError): self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) def test_add_txt_record_domain_not_found(self) -> None: self.client.describe_domains.return_value = _mock_domain_response(None) with pytest.raises(errors.PluginError): self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) def test_add_txt_record_bad_creds(self) -> None: self.client.describe_domains.side_effect = _make_api_error("InvalidAccessKeyId.NotFound") with pytest.raises(errors.PluginError): self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) self.client.describe_domains.side_effect = _make_api_error("SignatureDoesNotMatch") with pytest.raises(errors.PluginError): self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) self.client.describe_domains.side_effect = _make_api_error("InvalidAccessKeyId.Inactive") with pytest.raises(errors.PluginError): self.ac.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) def test_del_txt_record(self) -> None: self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) self.client.describe_domain_records.return_value = _mock_record_response(self.record_id) self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) self.client.describe_domains.assert_called_once() self.client.describe_domain_records.assert_called_once() self.client.delete_domain_record.assert_called_once() called_request = self.client.describe_domain_records.call_args.args[0] assert called_request.domain_name == DOMAIN assert called_request.type == "TXT" assert called_request.rrkey_word == self.record_prefix assert called_request.value_key_word == self.record_content assert called_request.search_mode == "COMBINATION" called_request = self.client.delete_domain_record.call_args.args[0] assert called_request.record_id == self.record_id def test_del_txt_record_domain_not_found(self) -> None: self.client.describe_domains.return_value = _mock_domain_response(None) self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) self.client.describe_domains.assert_called_once() def test_del_txt_record_error_during_domain_lookup(self) -> None: self.client.describe_domains.side_effect = API_ERROR self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) def test_del_txt_record_no_record(self) -> None: self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) self.client.describe_domain_records.return_value = _mock_record_response(None) self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) self.client.describe_domain_records.assert_called_once() self.client.delete_domain_record.assert_not_called() def test_del_txt_record_error_during_record_lookup(self) -> None: self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) self.client.describe_domain_records.side_effect = API_ERROR self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) self.client.describe_domain_records.assert_called_once() self.client.delete_domain_record.assert_not_called() def test_del_txt_record_error_during_deletion(self) -> None: self.client.describe_domains.return_value = _mock_domain_response(DOMAIN) self.client.delete_domain_record.side_effect = API_ERROR self.ac.del_txt_record(DOMAIN, self.record_name, self.record_content) self.client.delete_domain_record.assert_called_once() if __name__ == "__main__": sys.exit(pytest.main(sys.argv[1:] + [__file__]))