diff options
-rw-r--r-- | awx/main/tests/unit/test_tasks.py | 262 |
1 files changed, 0 insertions, 262 deletions
diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index b960c80d34..9ac278b568 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -import configparser import json import os import shutil @@ -856,205 +855,6 @@ class TestJobCredentials(TestJobExecution): assert '--vault-id dev@prompt' in ' '.join(args) assert '--vault-id prod@prompt' in ' '.join(args) - @pytest.mark.parametrize("verify", (True, False)) - def test_k8s_credential(self, job, private_data_dir, verify, mock_me): - k8s = CredentialType.defaults['kubernetes_bearer_token']() - inputs = { - 'host': 'https://example.org/', - 'bearer_token': 'token123', - } - if verify: - inputs['verify_ssl'] = True - inputs['ssl_ca_cert'] = 'CERTDATA' - credential = Credential( - pk=1, - credential_type=k8s, - inputs=inputs, - ) - credential.inputs['bearer_token'] = encrypt_field(credential, 'bearer_token') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - assert env['K8S_AUTH_HOST'] == 'https://example.org/' - assert env['K8S_AUTH_API_KEY'] == 'token123' - - if verify: - assert env['K8S_AUTH_VERIFY_SSL'] == 'True' - local_path = to_host_path(env['K8S_AUTH_SSL_CA_CERT'], private_data_dir) - with open(local_path, 'r') as f: - cert = f.read() - assert cert == 'CERTDATA' - else: - assert env['K8S_AUTH_VERIFY_SSL'] == 'False' - assert 'K8S_AUTH_SSL_CA_CERT' not in env - - assert safe_env['K8S_AUTH_API_KEY'] == HIDDEN_PASSWORD - - def test_aws_cloud_credential(self, job, private_data_dir, mock_me): - aws = CredentialType.defaults['aws']() - credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret'}) - credential.inputs['password'] = encrypt_field(credential, 'password') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - assert env['AWS_ACCESS_KEY_ID'] == 'bob' - assert env['AWS_SECRET_ACCESS_KEY'] == 'secret' - assert 'AWS_SECURITY_TOKEN' not in env - assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD - - def test_aws_cloud_credential_with_sts_token(self, private_data_dir, job, mock_me): - aws = CredentialType.defaults['aws']() - credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret', 'security_token': 'token'}) - for key in ('password', 'security_token'): - credential.inputs[key] = encrypt_field(credential, key) - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - assert env['AWS_ACCESS_KEY_ID'] == 'bob' - assert env['AWS_SECRET_ACCESS_KEY'] == 'secret' - assert env['AWS_SECURITY_TOKEN'] == 'token' - assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD - - @pytest.mark.parametrize("cred_env_var", ['GCE_CREDENTIALS_FILE_PATH', 'GOOGLE_APPLICATION_CREDENTIALS']) - def test_gce_credentials(self, cred_env_var, private_data_dir, job, mock_me): - gce = CredentialType.defaults['gce']() - credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY}) - credential.inputs['ssh_key_data'] = encrypt_field(credential, 'ssh_key_data') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - runner_path = env[cred_env_var] - local_path = to_host_path(runner_path, private_data_dir) - with open(local_path, 'rb') as f_host: - json_data = json.load(f_host) - assert json_data['type'] == 'service_account' - assert json_data['private_key'] == self.EXAMPLE_PRIVATE_KEY - assert json_data['client_email'] == 'bob' - assert json_data['project_id'] == 'some-project' - - def test_azure_rm_with_tenant(self, private_data_dir, job, mock_me): - azure = CredentialType.defaults['azure_rm']() - credential = Credential( - pk=1, credential_type=azure, inputs={'client': 'some-client', 'secret': 'some-secret', 'tenant': 'some-tenant', 'subscription': 'some-subscription'} - ) - credential.inputs['secret'] = encrypt_field(credential, 'secret') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - assert env['AZURE_CLIENT_ID'] == 'some-client' - assert env['AZURE_SECRET'] == 'some-secret' - assert env['AZURE_TENANT'] == 'some-tenant' - assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription' - assert safe_env['AZURE_SECRET'] == HIDDEN_PASSWORD - - def test_azure_rm_with_password(self, private_data_dir, job, mock_me): - azure = CredentialType.defaults['azure_rm']() - credential = Credential( - pk=1, credential_type=azure, inputs={'subscription': 'some-subscription', 'username': 'bob', 'password': 'secret', 'cloud_environment': 'foobar'} - ) - credential.inputs['password'] = encrypt_field(credential, 'password') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription' - assert env['AZURE_AD_USER'] == 'bob' - assert env['AZURE_PASSWORD'] == 'secret' - assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar' - assert safe_env['AZURE_PASSWORD'] == HIDDEN_PASSWORD - - def test_vmware_credentials(self, private_data_dir, job, mock_me): - vmware = CredentialType.defaults['vmware']() - credential = Credential(pk=1, credential_type=vmware, inputs={'username': 'bob', 'password': 'secret', 'host': 'https://example.org'}) - credential.inputs['password'] = encrypt_field(credential, 'password') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - assert env['VMWARE_USER'] == 'bob' - assert env['VMWARE_PASSWORD'] == 'secret' - assert env['VMWARE_HOST'] == 'https://example.org' - assert safe_env['VMWARE_PASSWORD'] == HIDDEN_PASSWORD - - def test_openstack_credentials(self, private_data_dir, job, mock_me): - task = jobs.RunJob() - task.instance = job - openstack = CredentialType.defaults['openstack']() - credential = Credential( - pk=1, credential_type=openstack, inputs={'username': 'bob', 'password': 'secret', 'project': 'tenant-name', 'host': 'https://keystone.example.org'} - ) - credential.inputs['password'] = encrypt_field(credential, 'password') - job.credentials.add(credential) - - private_data_files, ssh_key_data = task.build_private_data_files(job, private_data_dir) - env = task.build_env(job, private_data_dir, private_data_files=private_data_files) - credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir) - - config_loc = to_host_path(env['OS_CLIENT_CONFIG_FILE'], private_data_dir) - with open(config_loc, 'r') as f: - shade_config = f.read() - assert shade_config == '\n'.join( - [ - 'clouds:', - ' devstack:', - ' auth:', - ' auth_url: https://keystone.example.org', - ' password: secret', - ' project_name: tenant-name', - ' username: bob', - ' verify: true', - '', - ] - ) - - @pytest.mark.parametrize("ca_file", [None, '/path/to/some/file']) - def test_rhv_credentials(self, private_data_dir, job, ca_file, mock_me): - rhv = CredentialType.defaults['rhv']() - inputs = { - 'host': 'some-ovirt-host.example.org', - 'username': 'bob', - 'password': 'some-pass', - } - if ca_file: - inputs['ca_file'] = ca_file - credential = Credential(pk=1, credential_type=rhv, inputs=inputs) - credential.inputs['password'] = encrypt_field(credential, 'password') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - config = configparser.ConfigParser() - host_path = to_host_path(env['OVIRT_INI_PATH'], private_data_dir) - config.read(host_path) - assert config.get('ovirt', 'ovirt_url') == 'some-ovirt-host.example.org' - assert config.get('ovirt', 'ovirt_username') == 'bob' - assert config.get('ovirt', 'ovirt_password') == 'some-pass' - if ca_file: - assert config.get('ovirt', 'ovirt_ca_file') == ca_file - else: - with pytest.raises(configparser.NoOptionError): - config.get('ovirt', 'ovirt_ca_file') - @pytest.mark.parametrize( 'authorize, expected_authorize', [ @@ -1089,68 +889,6 @@ class TestJobCredentials(TestJobExecution): assert f.read() == self.EXAMPLE_PRIVATE_KEY assert safe_env['ANSIBLE_NET_PASSWORD'] == HIDDEN_PASSWORD - def test_terraform_cloud_credentials(self, job, private_data_dir, mock_me): - terraform = CredentialType.defaults['terraform']() - hcl_config = ''' - backend "s3" { - bucket = "s3_sample_bucket" - key = "/tf_state/" - region = "us-east-1" - } - ''' - credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config}) - credential.inputs['configuration'] = encrypt_field(credential, 'configuration') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir) - with open(local_path, 'r') as f: - config = f.read() - assert config == hcl_config - - def test_terraform_gcs_backend_credentials(self, job, private_data_dir, mock_me): - terraform = CredentialType.defaults['terraform']() - hcl_config = ''' - backend "gcs" { - bucket = "gce_storage" - } - ''' - gce_backend_credentials = ''' - { - "type": "service_account", - "project_id": "sample", - "private_key_id": "eeeeeeeeeeeeeeeeeeeeeeeeeee", - "private_key": "-----BEGIN PRIVATE KEY-----\naaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n-----END PRIVATE KEY-----\n", - "client_email": "sample@sample.iam.gserviceaccount.com", - "client_id": "0123456789", - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "https://oauth2.googleapis.com/token", - "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/cloud-content-robot%40sample.iam.gserviceaccount.com", - } - ''' - credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config, 'gce_credentials': gce_backend_credentials}) - credential.inputs['configuration'] = encrypt_field(credential, 'configuration') - credential.inputs['gce_credentials'] = encrypt_field(credential, 'gce_credentials') - job.credentials.add(credential) - - env = {} - safe_env = {} - credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir) - - local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir) - with open(local_path, 'r') as f: - config = f.read() - assert config == hcl_config - - credentials_path = to_host_path(env['GOOGLE_BACKEND_CREDENTIALS'], private_data_dir) - with open(credentials_path, 'r') as f: - credentials = f.read() - assert credentials == gce_backend_credentials - def test_multi_cloud(self, private_data_dir, mock_me): gce = CredentialType.defaults['gce']() gce_credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY}) |