summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--awx/main/tests/unit/test_tasks.py262
1 files changed, 0 insertions, 262 deletions
diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py
index b960c80d34..9ac278b568 100644
--- a/awx/main/tests/unit/test_tasks.py
+++ b/awx/main/tests/unit/test_tasks.py
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
-import configparser
import json
import os
import shutil
@@ -856,205 +855,6 @@ class TestJobCredentials(TestJobExecution):
assert '--vault-id dev@prompt' in ' '.join(args)
assert '--vault-id prod@prompt' in ' '.join(args)
- @pytest.mark.parametrize("verify", (True, False))
- def test_k8s_credential(self, job, private_data_dir, verify, mock_me):
- k8s = CredentialType.defaults['kubernetes_bearer_token']()
- inputs = {
- 'host': 'https://example.org/',
- 'bearer_token': 'token123',
- }
- if verify:
- inputs['verify_ssl'] = True
- inputs['ssl_ca_cert'] = 'CERTDATA'
- credential = Credential(
- pk=1,
- credential_type=k8s,
- inputs=inputs,
- )
- credential.inputs['bearer_token'] = encrypt_field(credential, 'bearer_token')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- assert env['K8S_AUTH_HOST'] == 'https://example.org/'
- assert env['K8S_AUTH_API_KEY'] == 'token123'
-
- if verify:
- assert env['K8S_AUTH_VERIFY_SSL'] == 'True'
- local_path = to_host_path(env['K8S_AUTH_SSL_CA_CERT'], private_data_dir)
- with open(local_path, 'r') as f:
- cert = f.read()
- assert cert == 'CERTDATA'
- else:
- assert env['K8S_AUTH_VERIFY_SSL'] == 'False'
- assert 'K8S_AUTH_SSL_CA_CERT' not in env
-
- assert safe_env['K8S_AUTH_API_KEY'] == HIDDEN_PASSWORD
-
- def test_aws_cloud_credential(self, job, private_data_dir, mock_me):
- aws = CredentialType.defaults['aws']()
- credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret'})
- credential.inputs['password'] = encrypt_field(credential, 'password')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- assert env['AWS_ACCESS_KEY_ID'] == 'bob'
- assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
- assert 'AWS_SECURITY_TOKEN' not in env
- assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD
-
- def test_aws_cloud_credential_with_sts_token(self, private_data_dir, job, mock_me):
- aws = CredentialType.defaults['aws']()
- credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret', 'security_token': 'token'})
- for key in ('password', 'security_token'):
- credential.inputs[key] = encrypt_field(credential, key)
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- assert env['AWS_ACCESS_KEY_ID'] == 'bob'
- assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
- assert env['AWS_SECURITY_TOKEN'] == 'token'
- assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD
-
- @pytest.mark.parametrize("cred_env_var", ['GCE_CREDENTIALS_FILE_PATH', 'GOOGLE_APPLICATION_CREDENTIALS'])
- def test_gce_credentials(self, cred_env_var, private_data_dir, job, mock_me):
- gce = CredentialType.defaults['gce']()
- credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
- credential.inputs['ssh_key_data'] = encrypt_field(credential, 'ssh_key_data')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
- runner_path = env[cred_env_var]
- local_path = to_host_path(runner_path, private_data_dir)
- with open(local_path, 'rb') as f_host:
- json_data = json.load(f_host)
- assert json_data['type'] == 'service_account'
- assert json_data['private_key'] == self.EXAMPLE_PRIVATE_KEY
- assert json_data['client_email'] == 'bob'
- assert json_data['project_id'] == 'some-project'
-
- def test_azure_rm_with_tenant(self, private_data_dir, job, mock_me):
- azure = CredentialType.defaults['azure_rm']()
- credential = Credential(
- pk=1, credential_type=azure, inputs={'client': 'some-client', 'secret': 'some-secret', 'tenant': 'some-tenant', 'subscription': 'some-subscription'}
- )
- credential.inputs['secret'] = encrypt_field(credential, 'secret')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- assert env['AZURE_CLIENT_ID'] == 'some-client'
- assert env['AZURE_SECRET'] == 'some-secret'
- assert env['AZURE_TENANT'] == 'some-tenant'
- assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
- assert safe_env['AZURE_SECRET'] == HIDDEN_PASSWORD
-
- def test_azure_rm_with_password(self, private_data_dir, job, mock_me):
- azure = CredentialType.defaults['azure_rm']()
- credential = Credential(
- pk=1, credential_type=azure, inputs={'subscription': 'some-subscription', 'username': 'bob', 'password': 'secret', 'cloud_environment': 'foobar'}
- )
- credential.inputs['password'] = encrypt_field(credential, 'password')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
- assert env['AZURE_AD_USER'] == 'bob'
- assert env['AZURE_PASSWORD'] == 'secret'
- assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
- assert safe_env['AZURE_PASSWORD'] == HIDDEN_PASSWORD
-
- def test_vmware_credentials(self, private_data_dir, job, mock_me):
- vmware = CredentialType.defaults['vmware']()
- credential = Credential(pk=1, credential_type=vmware, inputs={'username': 'bob', 'password': 'secret', 'host': 'https://example.org'})
- credential.inputs['password'] = encrypt_field(credential, 'password')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- assert env['VMWARE_USER'] == 'bob'
- assert env['VMWARE_PASSWORD'] == 'secret'
- assert env['VMWARE_HOST'] == 'https://example.org'
- assert safe_env['VMWARE_PASSWORD'] == HIDDEN_PASSWORD
-
- def test_openstack_credentials(self, private_data_dir, job, mock_me):
- task = jobs.RunJob()
- task.instance = job
- openstack = CredentialType.defaults['openstack']()
- credential = Credential(
- pk=1, credential_type=openstack, inputs={'username': 'bob', 'password': 'secret', 'project': 'tenant-name', 'host': 'https://keystone.example.org'}
- )
- credential.inputs['password'] = encrypt_field(credential, 'password')
- job.credentials.add(credential)
-
- private_data_files, ssh_key_data = task.build_private_data_files(job, private_data_dir)
- env = task.build_env(job, private_data_dir, private_data_files=private_data_files)
- credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
-
- config_loc = to_host_path(env['OS_CLIENT_CONFIG_FILE'], private_data_dir)
- with open(config_loc, 'r') as f:
- shade_config = f.read()
- assert shade_config == '\n'.join(
- [
- 'clouds:',
- ' devstack:',
- ' auth:',
- ' auth_url: https://keystone.example.org',
- ' password: secret',
- ' project_name: tenant-name',
- ' username: bob',
- ' verify: true',
- '',
- ]
- )
-
- @pytest.mark.parametrize("ca_file", [None, '/path/to/some/file'])
- def test_rhv_credentials(self, private_data_dir, job, ca_file, mock_me):
- rhv = CredentialType.defaults['rhv']()
- inputs = {
- 'host': 'some-ovirt-host.example.org',
- 'username': 'bob',
- 'password': 'some-pass',
- }
- if ca_file:
- inputs['ca_file'] = ca_file
- credential = Credential(pk=1, credential_type=rhv, inputs=inputs)
- credential.inputs['password'] = encrypt_field(credential, 'password')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- config = configparser.ConfigParser()
- host_path = to_host_path(env['OVIRT_INI_PATH'], private_data_dir)
- config.read(host_path)
- assert config.get('ovirt', 'ovirt_url') == 'some-ovirt-host.example.org'
- assert config.get('ovirt', 'ovirt_username') == 'bob'
- assert config.get('ovirt', 'ovirt_password') == 'some-pass'
- if ca_file:
- assert config.get('ovirt', 'ovirt_ca_file') == ca_file
- else:
- with pytest.raises(configparser.NoOptionError):
- config.get('ovirt', 'ovirt_ca_file')
-
@pytest.mark.parametrize(
'authorize, expected_authorize',
[
@@ -1089,68 +889,6 @@ class TestJobCredentials(TestJobExecution):
assert f.read() == self.EXAMPLE_PRIVATE_KEY
assert safe_env['ANSIBLE_NET_PASSWORD'] == HIDDEN_PASSWORD
- def test_terraform_cloud_credentials(self, job, private_data_dir, mock_me):
- terraform = CredentialType.defaults['terraform']()
- hcl_config = '''
- backend "s3" {
- bucket = "s3_sample_bucket"
- key = "/tf_state/"
- region = "us-east-1"
- }
- '''
- credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config})
- credential.inputs['configuration'] = encrypt_field(credential, 'configuration')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir)
- with open(local_path, 'r') as f:
- config = f.read()
- assert config == hcl_config
-
- def test_terraform_gcs_backend_credentials(self, job, private_data_dir, mock_me):
- terraform = CredentialType.defaults['terraform']()
- hcl_config = '''
- backend "gcs" {
- bucket = "gce_storage"
- }
- '''
- gce_backend_credentials = '''
- {
- "type": "service_account",
- "project_id": "sample",
- "private_key_id": "eeeeeeeeeeeeeeeeeeeeeeeeeee",
- "private_key": "-----BEGIN PRIVATE KEY-----\naaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n-----END PRIVATE KEY-----\n",
- "client_email": "sample@sample.iam.gserviceaccount.com",
- "client_id": "0123456789",
- "auth_uri": "https://accounts.google.com/o/oauth2/auth",
- "token_uri": "https://oauth2.googleapis.com/token",
- "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
- "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/cloud-content-robot%40sample.iam.gserviceaccount.com",
- }
- '''
- credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config, 'gce_credentials': gce_backend_credentials})
- credential.inputs['configuration'] = encrypt_field(credential, 'configuration')
- credential.inputs['gce_credentials'] = encrypt_field(credential, 'gce_credentials')
- job.credentials.add(credential)
-
- env = {}
- safe_env = {}
- credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
-
- local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir)
- with open(local_path, 'r') as f:
- config = f.read()
- assert config == hcl_config
-
- credentials_path = to_host_path(env['GOOGLE_BACKEND_CREDENTIALS'], private_data_dir)
- with open(credentials_path, 'r') as f:
- credentials = f.read()
- assert credentials == gce_backend_credentials
-
def test_multi_cloud(self, private_data_dir, mock_me):
gce = CredentialType.defaults['gce']()
gce_credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})