summaryrefslogtreecommitdiffstats
path: root/src/test/rgw
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/rgw')
-rw-r--r--src/test/rgw/bucket_notification/README.rst14
-rw-r--r--src/test/rgw/bucket_notification/__init__.py2
-rw-r--r--src/test/rgw/bucket_notification/api.py4
-rw-r--r--src/test/rgw/bucket_notification/requirements.txt2
-rw-r--r--src/test/rgw/bucket_notification/test_bn.py685
-rw-r--r--src/test/rgw/rgw_multi/tests.py26
-rw-r--r--src/test/rgw/test-rgw-common.sh6
-rwxr-xr-xsrc/test/rgw/test-rgw-multisite.sh58
-rw-r--r--src/test/rgw/test_log_backing.cc1
-rw-r--r--src/test/rgw/test_multi.py8
-rw-r--r--src/test/rgw/test_rgw_iam_policy.cc8
-rw-r--r--src/test/rgw/test_rgw_lua.cc178
-rw-r--r--src/test/rgw/test_rgw_posix_driver.cc7
13 files changed, 822 insertions, 177 deletions
diff --git a/src/test/rgw/bucket_notification/README.rst b/src/test/rgw/bucket_notification/README.rst
index 050a2e380c2..9e553cea432 100644
--- a/src/test/rgw/bucket_notification/README.rst
+++ b/src/test/rgw/bucket_notification/README.rst
@@ -25,6 +25,7 @@ we would need the following configuration file::
access_key = 1234567890
secret_key = pencil
+Add boto3 extension to the standard client: https://github.com/ceph/ceph/tree/main/examples/rgw/boto3#introduction.
===========
Kafka Tests
@@ -125,7 +126,7 @@ To run the Kafka security test, you also need to provide the test with the locat
RabbitMQ Tests
==============
-You need to install RabbitMQ in the following way::
+You need to install RabbitMQ, check supported platforms: https://www.rabbitmq.com/docs/platforms. For example, for Fedora::
sudo dnf install rabbitmq-server
@@ -133,13 +134,17 @@ Then you need to run the following command::
sudo chkconfig rabbitmq-server on
+Update rabbitmq-server configuration to allow access to the guest user from anywhere on the network. Uncomment or add line to rabbirmq configuration, usually `/etc/rabbitmq/rabbirmq.comf`::
+
+ loopback_user.guest = false
+
Finally, to start the RabbitMQ server you need to run the following command::
- sudo /sbin/service rabbitmq-server start
+ sudo systemctl start rabbitmq-server
To confirm that the RabbitMQ server is running you can run the following command to check the status of the server::
- sudo /sbin/service rabbitmq-server status
+ sudo systemctl status rabbitmq-server
After running `vstart.sh` and RabbitMQ server you're ready to run the AMQP tests::
@@ -147,7 +152,7 @@ After running `vstart.sh` and RabbitMQ server you're ready to run the AMQP tests
After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``) and the RabbitMQ server by running the following command::
- sudo /sbin/service rabbitmq-server stop
+ sudo systemctl stop rabbitmq-server
To run the RabbitMQ SSL security tests use the following::
@@ -156,4 +161,3 @@ To run the RabbitMQ SSL security tests use the following::
During these tests, the test script will restart the RabbitMQ server with the correct security configuration (``sudo`` privileges will be needed).
For that reason it is not recommended to run the `amqp_ssl_test` tests, that assumes a manually configured rabbirmq server, in the same run as `amqp_test` tests,
that assume the rabbitmq daemon running on the host as a service.
-
diff --git a/src/test/rgw/bucket_notification/__init__.py b/src/test/rgw/bucket_notification/__init__.py
index 222af7489f3..97d6cf3c5a2 100644
--- a/src/test/rgw/bucket_notification/__init__.py
+++ b/src/test/rgw/bucket_notification/__init__.py
@@ -31,7 +31,7 @@ def setup():
global default_cluster
default_cluster = defaults.get("cluster")
-
+
version = defaults.get("version")
if version == "v1":
_, result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], default_cluster)
diff --git a/src/test/rgw/bucket_notification/api.py b/src/test/rgw/bucket_notification/api.py
index e7ec31f1711..e84aa16edc7 100644
--- a/src/test/rgw/bucket_notification/api.py
+++ b/src/test/rgw/bucket_notification/api.py
@@ -247,12 +247,16 @@ def delete_all_topics(conn, tenant, cluster):
if tenant == '':
topics_result = admin(['topic', 'list'], cluster)
topics_json = json.loads(topics_result[0])
+ if 'topics' not in topics_json:
+ topics_json = topics_json.get('result',{})
for topic in topics_json['topics']:
rm_result = admin(['topic', 'rm', '--topic', topic['name']], cluster)
print(rm_result)
else:
topics_result = admin(['topic', 'list', '--tenant', tenant], cluster)
topics_json = json.loads(topics_result[0])
+ if 'topics' not in topics_json:
+ topics_json = topics_json.get('result',{})
for topic in topics_json['topics']:
rm_result = admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
print(rm_result)
diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt
index a3cff2bedab..bb74eceedc3 100644
--- a/src/test/rgw/bucket_notification/requirements.txt
+++ b/src/test/rgw/bucket_notification/requirements.txt
@@ -1,4 +1,4 @@
-nose >=1.0.0
+nose-py3 >=1.0.0
boto >=2.6.0
boto3 >=1.0.0
configparser >=5.0.0
diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py
index bdb302511f4..665fbca7494 100644
--- a/src/test/rgw/bucket_notification/test_bn.py
+++ b/src/test/rgw/bucket_notification/test_bn.py
@@ -41,7 +41,7 @@ from .api import PSTopicS3, \
admin
from nose import SkipTest
-from nose.tools import assert_not_equal, assert_equal, assert_in, assert_true
+from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true
import boto.s3.tagging
# configure logging for the tests module
@@ -167,19 +167,19 @@ class HTTPPostHandler(BaseHTTPRequestHandler):
"""implementation of POST handler"""
content_length = int(self.headers['Content-Length'])
if content_length == 0:
- log.info('HTTP Server received iempty event')
+ log.info('HTTP Server received empty event')
self.send_response(200)
self.end_headers()
return
body = self.rfile.read(content_length)
if self.server.cloudevents:
- event = from_http(self.headers, body)
+ event = from_http(self.headers, body)
record = json.loads(body)['Records'][0]
assert_equal(event['specversion'], '1.0')
assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'])
assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'])
assert_equal(event['type'], 'com.amazonaws.' + record['eventName'])
- assert_equal(event['datacontenttype'], 'application/json')
+ assert_equal(event['datacontenttype'], 'application/json')
assert_equal(event['subject'], record['s3']['object']['key'])
assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
log.info('HTTP Server received event: %s', str(body))
@@ -238,7 +238,7 @@ class HTTPServerWithEvents(ThreadingHTTPServer):
self.acquire_lock()
self.events.append(event)
self.lock.release()
-
+
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
"""verify stored s3 records agains a list of keys"""
self.acquire_lock()
@@ -410,17 +410,25 @@ kafka_server = 'localhost'
class KafkaReceiver(object):
"""class for receiving and storing messages on a topic from the kafka broker"""
- def __init__(self, topic, security_type):
+ def __init__(self, topic, security_type, kafka_server='localhost'):
from kafka import KafkaConsumer
remaining_retries = 10
port = 9092
if security_type != 'PLAINTEXT':
security_type = 'SSL'
port = 9093
+
+ if kafka_server is None:
+ endpoint = "localhost" + ":" + str(port)
+ elif ":" not in kafka_server:
+ endpoint = kafka_server + ":" + str(port)
+ else:
+ endpoint = kafka_server
+
while remaining_retries > 0:
try:
- self.consumer = KafkaConsumer(topic,
- bootstrap_servers = kafka_server+':'+str(port),
+ self.consumer = KafkaConsumer(topic,
+ bootstrap_servers=endpoint,
security_protocol=security_type,
consumer_timeout_ms=16000,
auto_offset_reset='earliest')
@@ -468,9 +476,9 @@ def kafka_receiver_thread_runner(receiver):
print('Kafka receiver ended unexpectedly: ' + str(error))
-def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None):
"""create kafka receiver and thread"""
- receiver = KafkaReceiver(topic, security_type)
+ receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers)
task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
task.daemon = True
return task, receiver
@@ -487,10 +495,6 @@ def stop_kafka_receiver(receiver, task):
def get_ip():
- return 'localhost'
-
-
-def get_ip_http():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# address should not be reachable
@@ -509,7 +513,7 @@ def connection():
conn = S3Connection(aws_access_key_id=vstart_access_key,
aws_secret_access_key=vstart_secret_key,
- is_secure=False, port=port_no, host=hostname,
+ is_secure=False, port=port_no, host=hostname,
calling_format='boto.s3.connection.OrdinaryCallingFormat')
return conn
@@ -523,7 +527,7 @@ def connection2():
conn = S3Connection(aws_access_key_id=vstart_access_key,
aws_secret_access_key=vstart_secret_key,
- is_secure=False, port=port_no, host=hostname,
+ is_secure=False, port=port_no, host=hostname,
calling_format='boto.s3.connection.OrdinaryCallingFormat')
return conn
@@ -547,7 +551,7 @@ def another_user(user=None, tenant=None, account=None):
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
+ is_secure=False, port=get_config_port(), host=get_config_host(),
calling_format='boto.s3.connection.OrdinaryCallingFormat')
return conn, arn
@@ -667,13 +671,13 @@ def connect_random_user(tenant=''):
@attr('basic_test')
def test_ps_s3_topic_on_master():
""" test s3 topics set/get/delete on master """
-
+
tenant = 'kaboom'
conn = connect_random_user(tenant)
-
+
# make sure there are no leftover topics
delete_all_topics(conn, tenant, get_config_cluster())
-
+
zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
@@ -715,31 +719,28 @@ def test_ps_s3_topic_on_master():
assert_equal(status, 404)
# get the remaining 2 topics
- result, status = topic_conf1.get_list()
- assert_equal(status, 200)
- assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
+ list_topics(2, tenant)
# delete topics
- result = topic_conf2.del_config()
+ status = topic_conf2.del_config()
assert_equal(status, 200)
- result = topic_conf3.del_config()
+ status = topic_conf3.del_config()
assert_equal(status, 200)
# get topic list, make sure it is empty
- result, status = topic_conf1.get_list()
- assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
+ list_topics(0, tenant)
@attr('basic_test')
def test_ps_s3_topic_admin_on_master():
""" test s3 topics set/get/delete on master """
-
+
tenant = 'kaboom'
conn = connect_random_user(tenant)
-
+
# make sure there are no leftover topics
delete_all_topics(conn, tenant, get_config_cluster())
-
+
zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
@@ -828,18 +829,44 @@ def notification_configuration(with_cli):
'arn:aws:sns:' + zonegroup + '::' + topic_name)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name+'_1',
+ topic_conf_list = [
+ {
+ 'Id': notification_name+'_1',
'TopicArn': topic_arn,
- 'Events': ['s3:ObjectCreated:*']
- },
- {'Id': notification_name+'_2',
+ 'Events': ['s3:ObjectCreated:*'],
+ 'Filter': {
+ 'Key': {
+ 'FilterRules': [
+ {'Name': 'prefix', 'Value': 'test'},
+ {'Name': 'suffix', 'Value': 'txt'}
+ ]
+ }
+ }
+ },
+ {
+ 'Id': notification_name+'_2',
'TopicArn': topic_arn,
- 'Events': ['s3:ObjectRemoved:*']
- },
- {'Id': notification_name+'_3',
+ 'Events': ['s3:ObjectRemoved:*'],
+ 'Filter': {
+ 'Metadata': {
+ 'FilterRules': [
+ {'Name': 'x-amz-meta-foo', 'Value': 'bar'},
+ {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
+ },
+ }
+ },
+ {
+ 'Id': notification_name+'_3',
'TopicArn': topic_arn,
- 'Events': []
- }]
+ 'Events': [],
+ 'Filter': {
+ 'Tags': {
+ 'FilterRules': [
+ {'Name': 'tag1', 'Value': 'value1'},
+ {'Name': 'tag2', 'Value': 'value2'}]
+ }
+ }
+ }]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
@@ -1016,7 +1043,7 @@ def test_ps_s3_notification_filter_on_master():
""" test s3 notification filter on master """
hostname = get_ip()
-
+
conn = connection()
ps_zone = conn
@@ -1035,7 +1062,7 @@ def test_ps_s3_notification_filter_on_master():
# create s3 topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-
+
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
@@ -1285,7 +1312,7 @@ def test_ps_s3_notification_errors_on_master():
conn.delete_bucket(bucket_name)
-def notification_push(endpoint_type, conn, account=None, cloudevents=False):
+def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None):
""" test pushinging notification """
zonegroup = get_config_zonegroup()
# create bucket
@@ -1297,7 +1324,7 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False):
task = None
if endpoint_type == 'http':
# create random port for the http server
- host = get_ip_http()
+ host = get_ip()
port = random.randint(10000, 20000)
# start an http server in a separate thread
receiver = HTTPServerWithEvents((host, port), cloudevents=cloudevents)
@@ -1340,11 +1367,13 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False):
assert_equal(status/100, 2)
elif endpoint_type == 'kafka':
# start amqp receiver
- task, receiver = create_kafka_receiver_thread(topic_name)
+ task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers)
task.start()
endpoint_address = 'kafka://' + kafka_server
# without acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ if kafka_brokers is not None:
+ endpoint_args += '&kafka-brokers=' + kafka_brokers
# create s3 topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
@@ -1562,6 +1591,20 @@ def test_notification_push_kafka():
notification_push('kafka', conn)
+@attr('kafka_failover')
+def test_notification_push_kafka_multiple_brokers_override():
+ """ test pushing kafka s3 notification on master """
+ conn = connection()
+ notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092')
+
+
+@attr('kafka_failover')
+def test_notification_push_kafka_multiple_brokers_append():
+ """ test pushing kafka s3 notification on master """
+ conn = connection()
+ notification_push('kafka', conn, kafka_brokers='localhost:19092')
+
+
@attr('http_test')
def test_ps_s3_notification_multi_delete_on_master():
""" test deletion of multiple keys on master """
@@ -1901,7 +1944,7 @@ def test_lifecycle_abort_mpu():
def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""
-
+
if not external_endpoint_address:
hostname = get_ip()
proc = init_rabbitmq()
@@ -2305,7 +2348,7 @@ def metadata_filter(endpoint_type, conn):
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
+ topic_name = bucket_name + TOPIC_SUFFIX
# start endpoint receiver
host = get_ip()
@@ -2367,7 +2410,7 @@ def metadata_filter(endpoint_type, conn):
key_name = 'copy_of_foo'
bucket.copy_key(key_name, bucket.name, key.name)
expected_keys.append(key_name)
-
+
# create another objects in the bucket using COPY
# but override the metadata value
key_name = 'another_copy_of_foo'
@@ -2489,7 +2532,7 @@ def test_ps_s3_metadata_on_master():
# create objects in the bucket using COPY
key_name = 'copy_of_foo'
bucket.copy_key(key_name, bucket.name, key.name)
-
+
# create objects in the bucket using multi-part upload
fp = tempfile.NamedTemporaryFile(mode='w+b')
chunk_size = 1024*1024*5 # 5MB
@@ -2703,7 +2746,7 @@ def test_ps_s3_versioning_on_master():
if version not in versions:
print('version mismatch: '+version+' not in: '+str(versions))
# TODO: copy_key() does not return the version of the copied object
- #assert False
+ #assert False
else:
print('version ok: '+version+' in: '+str(versions))
@@ -2792,7 +2835,7 @@ def test_ps_s3_versioned_deletion_on_master():
size = event['s3']['object']['size']
if version not in versions:
print('version mismatch: '+version+' not in: '+str(versions))
- assert False
+ assert False
else:
print('version ok: '+version+' in: '+str(versions))
if event['eventName'] == 'ObjectRemoved:Delete':
@@ -2962,7 +3005,6 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non
log.info('waited for %ds for queue %s to drain', time_diff, topic_name)
-@attr('kafka_test')
def persistent_topic_stats(conn, endpoint_type):
zonegroup = get_config_zonegroup()
@@ -2974,12 +3016,13 @@ def persistent_topic_stats(conn, endpoint_type):
host = get_ip()
task = None
port = None
+ wrong_port = 1234
+ endpoint_address = endpoint_type+'://'+host+':'+str(wrong_port)
if endpoint_type == 'http':
# create random port for the http server
port = random.randint(10000, 20000)
# start an http server in a separate thread
receiver = HTTPServerWithEvents((host, port))
- endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
'&retry_sleep_duration=1'
elif endpoint_type == 'amqp':
@@ -2987,23 +3030,18 @@ def persistent_topic_stats(conn, endpoint_type):
exchange = 'ex1'
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
- endpoint_address = 'amqp://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
elif endpoint_type == 'kafka':
# start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
- endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
else:
return SkipTest('Unknown endpoint type: ' + endpoint_type)
# create s3 topic
- endpoint_address = 'kafka://' + host + ':1234' # wrong port
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
- '&retry_sleep_duration=1'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
@@ -3051,9 +3089,19 @@ def persistent_topic_stats(conn, endpoint_type):
get_stats_persistent_topic(topic_name, 2 * number_of_objects)
# change the endpoint port
- endpoint_address = 'kafka://' + host
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
- '&retry_sleep_duration=1'
+ if endpoint_type == 'http':
+ endpoint_address = endpoint_type+'://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ elif endpoint_type == 'amqp':
+ endpoint_address = endpoint_type+'://'+host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ elif endpoint_type == 'kafka':
+ endpoint_address = endpoint_type+'://'+host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
@@ -3068,19 +3116,26 @@ def persistent_topic_stats(conn, endpoint_type):
@attr('http_test')
-def persistent_topic_stats_http():
+def test_persistent_topic_stats_http():
""" test persistent topic stats, http endpoint """
conn = connection()
persistent_topic_stats(conn, 'http')
@attr('kafka_test')
-def persistent_topic_stats_kafka():
+def test_persistent_topic_stats_kafka():
""" test persistent topic stats, kafka endpoint """
conn = connection()
persistent_topic_stats(conn, 'kafka')
+@attr('amqp_test')
+def test_persistent_topic_stats_amqp():
+ """ test persistent topic stats, amqp endpoint """
+ conn = connection()
+ persistent_topic_stats(conn, 'amqp')
+
+
@attr('kafka_test')
def test_persistent_topic_dump():
""" test persistent topic dump """
@@ -3182,9 +3237,6 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
host = get_ip()
port = random.randint(10000, 20000)
- # start an http server in a separate thread
- http_server = HTTPServerWithEvents((host, port))
-
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
@@ -3205,9 +3257,6 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
- delay = 20
- time.sleep(delay)
- http_server.close()
# topic get
parsed_result = get_topic(topic_name)
parsed_result_dest = parsed_result["dest"]
@@ -3232,10 +3281,13 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- get_stats_persistent_topic(topic_name, number_of_objects)
+ if time_diff > persistency_time:
+ log.warning('persistency time for topic %s already passed. not possible to check object in the queue', topic_name)
+ else:
+ get_stats_persistent_topic(topic_name, number_of_objects)
+ # wait as much as ttl and check if the persistent topics have expired
+ time.sleep(persistency_time)
- # wait as much as ttl and check if the persistent topics have expired
- time.sleep(persistency_time)
get_stats_persistent_topic(topic_name, 0)
# delete objects from the bucket
@@ -3247,18 +3299,18 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
thr = threading.Thread(target = key.delete, args=())
thr.start()
client_threads.append(thr)
- if count%100 == 0:
- [thr.join() for thr in client_threads]
- time_diff = time.time() - start_time
- print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- client_threads = []
- start_time = time.time()
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- get_stats_persistent_topic(topic_name, number_of_objects)
+ if time_diff > persistency_time:
+ log.warning('persistency time for topic %s already passed. not possible to check object in the queue', topic_name)
+ else:
+ get_stats_persistent_topic(topic_name, number_of_objects)
+ # wait as much as ttl and check if the persistent topics have expired
+ time.sleep(persistency_time)
- # wait as much as ttl and check if the persistent topics have expired
- time.sleep(persistency_time)
get_stats_persistent_topic(topic_name, 0)
# cleanup
@@ -3266,7 +3318,6 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
- time.sleep(delay)
def create_persistency_config_string(config_dict):
str_ret = ""
@@ -3281,7 +3332,7 @@ def test_ps_s3_persistent_topic_configs_ttl():
""" test persistent topic configurations with time_to_live """
config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"}
buffer = 10
- persistency_time =config_dict["time_to_live"] + buffer
+ persistency_time = config_dict["time_to_live"] + buffer
ps_s3_persistent_topic_configs(persistency_time, config_dict)
@@ -3388,7 +3439,7 @@ def test_ps_s3_notification_kafka_idle_behaviour():
# name is constant for manual testing
topic_name = bucket_name+'_topic'
# create consumer on the topic
-
+
task, receiver = create_kafka_receiver_thread(topic_name+'_1')
task.start()
@@ -3833,7 +3884,7 @@ def persistent_notification(endpoint_type, conn, account=None):
task = None
if endpoint_type == 'http':
# create random port for the http server
- host = get_ip_http()
+ host = get_ip()
port = random.randint(10000, 20000)
# start an http server in a separate thread
receiver = HTTPServerWithEvents((host, port))
@@ -4059,7 +4110,7 @@ def test_ps_s3_topic_update():
amqp_task.start()
#topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
-
+
topic_arn = topic_conf.set_config()
#result, status = topic_conf.set_config()
#assert_equal(status/100, 2)
@@ -4323,7 +4374,7 @@ def test_ps_s3_multiple_topics_notification():
keys = list(bucket.list())
# TODO: use exact match
verify_s3_records_by_elements(records, keys, exact_match=False)
- receiver.verify_s3_events(keys, exact_match=False)
+ receiver.verify_s3_events(keys, exact_match=False)
result, _ = sub_conf2.get_events()
parsed_result = json.loads(result)
for record in parsed_result['Records']:
@@ -4344,6 +4395,242 @@ def test_ps_s3_multiple_topics_notification():
http_server.close()
+@attr('data_path_v2_test')
+def test_ps_s3_list_topics_migration():
+ """ test list topics on migration"""
+ if get_config_cluster() == 'noname':
+ return SkipTest('realm is needed for migration test')
+
+ # Initialize connections and configurations
+ conn1 = connection()
+ tenant = 'kaboom1'
+ conn2 = connect_random_user(tenant)
+ bucket_name = gen_bucket_name()
+ topics = [f"{bucket_name}{TOPIC_SUFFIX}{i}" for i in range(1, 7)]
+ tenant_topics = [f"{tenant}_{topic}" for topic in topics]
+
+ # Define topic names with version
+ topic_versions = {
+ "topic1_v2": f"{topics[0]}_v2",
+ "topic2_v2": f"{topics[1]}_v2",
+ "topic3_v1": f"{topics[2]}_v1",
+ "topic4_v1": f"{topics[3]}_v1",
+ "topic5_v1": f"{topics[4]}_v1",
+ "topic6_v1": f"{topics[5]}_v1",
+ "tenant_topic1_v2": f"{tenant_topics[0]}_v2",
+ "tenant_topic2_v1": f"{tenant_topics[1]}_v1",
+ "tenant_topic3_v1": f"{tenant_topics[2]}_v1"
+ }
+
+ # Get necessary configurations
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ zonegroup = get_config_zonegroup()
+ conf_cluster = get_config_cluster()
+
+ # Make sure there are no leftover topics on v2
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
+ delete_all_topics(conn1, '', conf_cluster)
+ delete_all_topics(conn2, tenant, conf_cluster)
+
+ # Start v1 notification
+ # Make sure there are no leftover topics on v1
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
+ delete_all_topics(conn1, '', conf_cluster)
+ delete_all_topics(conn2, tenant, conf_cluster)
+
+ # Create s3 - v1 topics
+ topic_conf = PSTopicS3(conn1, topic_versions['topic3_v1'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_versions['topic4_v1'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn4 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_versions['topic5_v1'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn5 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_versions['topic6_v1'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn6 = topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic2_v1'], zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn2 = tenant_topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic3_v1'], zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn3 = tenant_topic_conf.set_config()
+
+ # Start v2 notification
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
+
+ # Create s3 - v2 topics
+ topic_conf = PSTopicS3(conn1, topic_versions['topic1_v2'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_versions['topic2_v2'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic1_v2'], zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn1 = tenant_topic_conf.set_config()
+
+ # Verify topics list
+ try:
+ # Verify no tenant topics
+ res, status = topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 6)
+
+ # Verify tenant topics
+ res, status = tenant_topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 3)
+ finally:
+ # Cleanup created topics
+ topic_conf.del_config(topic_arn1)
+ topic_conf.del_config(topic_arn2)
+ topic_conf.del_config(topic_arn3)
+ topic_conf.del_config(topic_arn4)
+ topic_conf.del_config(topic_arn5)
+ topic_conf.del_config(topic_arn6)
+ tenant_topic_conf.del_config(tenant_topic_arn1)
+ tenant_topic_conf.del_config(tenant_topic_arn2)
+ tenant_topic_conf.del_config(tenant_topic_arn3)
+
+
+@attr('basic_test')
+def test_ps_s3_list_topics():
+ """ test list topics"""
+
+ # Initialize connections, topic names and configurations
+ conn1 = connection()
+ tenant = 'kaboom1'
+ conn2 = connect_random_user(tenant)
+ bucket_name = gen_bucket_name()
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '1'
+ topic_name2 = bucket_name + TOPIC_SUFFIX + '2'
+ topic_name3 = bucket_name + TOPIC_SUFFIX + '3'
+ tenant_topic_name1 = tenant + "_" + topic_name1
+ tenant_topic_name2 = tenant + "_" + topic_name2
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ zonegroup = get_config_zonegroup()
+
+ # Make sure there are no leftover topics
+ delete_all_topics(conn1, '', get_config_cluster())
+ delete_all_topics(conn2, tenant, get_config_cluster())
+
+ # Create s3 - v2 topics
+ topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_name3, zonegroup, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name1, zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn1 = tenant_topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name2, zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn2 = tenant_topic_conf.set_config()
+
+ # Verify topics list
+ try:
+ # Verify no tenant topics
+ res, status = topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else [] # version 2
+ assert_equal(len(member), 3)
+
+ # Verify topics for tenant
+ res, status = tenant_topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 2)
+ finally:
+ # Cleanup created topics
+ topic_conf.del_config(topic_arn1)
+ topic_conf.del_config(topic_arn2)
+ topic_conf.del_config(topic_arn3)
+ tenant_topic_conf.del_config(tenant_topic_arn1)
+ tenant_topic_conf.del_config(tenant_topic_arn2)
+
+@attr('data_path_v2_test')
+def test_ps_s3_list_topics_v1():
+ """ test list topics on v1"""
+ if get_config_cluster() == 'noname':
+ return SkipTest('realm is needed')
+
+ # Initialize connections and configurations
+ conn1 = connection()
+ tenant = 'kaboom1'
+ conn2 = connect_random_user(tenant)
+ bucket_name = gen_bucket_name()
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '1'
+ topic_name2 = bucket_name + TOPIC_SUFFIX + '2'
+ topic_name3 = bucket_name + TOPIC_SUFFIX + '3'
+ tenant_topic_name1 = tenant + "_" + topic_name1
+ tenant_topic_name2 = tenant + "_" + topic_name2
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ zonegroup = get_config_zonegroup()
+ conf_cluster = get_config_cluster()
+
+ # Make sure there are no leftover topics
+ delete_all_topics(conn1, '', conf_cluster)
+ delete_all_topics(conn2, tenant, conf_cluster)
+
+ # Make sure that we disable v2
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
+
+ # Create s3 - v1 topics
+ topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_name3, zonegroup, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name1, zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn1 = tenant_topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name2, zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn2 = tenant_topic_conf.set_config()
+
+ # Verify topics list
+ try:
+ # Verify no tenant topics
+ res, status = topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 3)
+
+ # Verify tenant topics
+ res, status = tenant_topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 2)
+ finally:
+ # Cleanup created topics
+ topic_conf.del_config(topic_arn1)
+ topic_conf.del_config(topic_arn2)
+ topic_conf.del_config(topic_arn3)
+ tenant_topic_conf.del_config(tenant_topic_arn1)
+ tenant_topic_conf.del_config(tenant_topic_arn2)
+
+
@attr('basic_test')
def test_ps_s3_topic_permissions():
""" test s3 topic set/get/delete permissions """
@@ -4461,7 +4748,7 @@ def test_ps_s3_topic_no_permissions():
zonegroup = 'default'
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
-
+
# create s3 topic without policy
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
@@ -4507,7 +4794,7 @@ def test_ps_s3_topic_no_permissions():
s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
_, status = s3_notification_conf2.set_config()
assert_equal(status, 200)
-
+
try:
# 2nd user tries to delete the topic
status = topic_conf2.del_config(topic_arn=topic_arn)
@@ -4561,11 +4848,11 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
-
+
# create consumer on the topic
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
-
+
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -5115,7 +5402,7 @@ def test_ps_s3_data_path_v2_mixed_migration():
tenants_list.append('')
# make sure there are no leftover topics
delete_all_topics(conn, '', get_config_cluster())
-
+
# make sure that we start at v2
zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
@@ -5411,3 +5698,219 @@ def test_connection_caching():
conn.delete_bucket(bucket_name)
receiver_1.close(task_1)
receiver_2.close(task_2)
+
+
+@attr("http_test")
+def test_topic_migration_to_an_account():
+ """test the topic migration procedure described at
+ https://docs.ceph.com/en/latest/radosgw/account/#migrate-an-existing-user-into-an-account
+ """
+ try:
+ # create an http server for notification delivery
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ http_server = HTTPServerWithEvents((host, port))
+
+ # start with two non-account users
+ # create a new user for "user1" which is going to be migrated to an account
+ user1_conn, user1_arn = another_user()
+ user1_id = user1_arn.split("/")[1]
+ user2_conn = connection()
+ log.info(
+ f"two non-account users with acckeys user1 {user1_conn.access_key} and user2 {user2_conn.access_key}"
+ )
+
+ # create a bucket per user
+ user1_bucket_name = gen_bucket_name()
+ user2_bucket_name = gen_bucket_name()
+ user1_bucket = user1_conn.create_bucket(user1_bucket_name)
+ user2_bucket = user2_conn.create_bucket(user2_bucket_name)
+ log.info(
+ f"one bucket per user {user1_conn.access_key}: {user1_bucket_name} and {user2_conn.access_key}: {user2_bucket_name}"
+ )
+
+ # create an S3 topic owned by the first user
+ topic_name = user1_bucket_name + TOPIC_SUFFIX
+ zonegroup = get_config_zonegroup()
+ endpoint_address = "http://" + host + ":" + str(port)
+ endpoint_args = "push-endpoint=" + endpoint_address + "&persistent=true"
+ expected_topic_arn = "arn:aws:sns:" + zonegroup + "::" + topic_name
+ topic_conf = PSTopicS3(
+ user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args
+ )
+ topic_arn = topic_conf.set_config()
+ assert_equal(topic_arn, expected_topic_arn)
+ log.info(
+ f"{user1_conn.access_key} created the topic {topic_arn} with args {endpoint_args}"
+ )
+
+ # both buckets subscribe to the same and only topic using the same notification id
+ notification_name = user1_bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [
+ {
+ "Id": notification_name,
+ "TopicArn": topic_arn,
+ "Events": ["s3:ObjectCreated:*"],
+ }
+ ]
+ s3_notification_conf1 = PSNotificationS3(
+ user1_conn, user1_bucket_name, topic_conf_list
+ )
+ s3_notification_conf2 = PSNotificationS3(
+ user2_conn, user2_bucket_name, topic_conf_list
+ )
+ _, status = s3_notification_conf1.set_config()
+ assert_equal(status / 100, 2)
+ _, status = s3_notification_conf2.set_config()
+ assert_equal(status / 100, 2)
+ # verify both buckets are subscribed to the topic
+ rgw_topic_entry = [
+ t for t in list_topics()["topics"] if t["name"] == topic_name
+ ]
+ assert_equal(len(rgw_topic_entry), 1)
+ subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
+ assert_equal(len(subscribed_buckets), 2)
+ assert_in(user1_bucket_name, subscribed_buckets)
+ assert_in(user2_bucket_name, subscribed_buckets)
+ log.info(
+ "buckets {user1_bucket_name} and {user2_bucket_name} are subscribed to {topic_arn}"
+ )
+
+ # move user1 to an account
+ account_id = "RGW98765432101234567"
+ cmd = ["account", "create", "--account-id", account_id]
+ _, rc = admin(cmd, get_config_cluster())
+ assert rc == 0, f"failed to create {account_id}: {rc}"
+ cmd = [
+ "user",
+ "modify",
+ "--uid",
+ user1_id,
+ "--account-id",
+ account_id,
+ "--account-root",
+ ]
+ _, rc = admin(cmd, get_config_cluster())
+ assert rc == 0, f"failed to modify {user1_id}: {rc}"
+ log.info(
+ f"{user1_conn.access_key}/{user1_id} is migrated to account {account_id} as root user"
+ )
+
+ # verify the topic is functional
+ user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
+ user2_bucket.new_key("user2obj1").set_contents_from_string("object content")
+ wait_for_queue_to_drain(topic_name, http_port=port)
+ http_server.verify_s3_events(
+ list(user1_bucket.list()) + list(user2_bucket.list()), exact_match=True
+ )
+
+ # create a new account topic with the same name as the existing topic
+ # note that the expected arn now contains the account ID
+ expected_topic_arn = (
+ "arn:aws:sns:" + zonegroup + ":" + account_id + ":" + topic_name
+ )
+ topic_conf = PSTopicS3(
+ user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args
+ )
+ account_topic_arn = topic_conf.set_config()
+ assert_equal(account_topic_arn, expected_topic_arn)
+ log.info(
+ f"{user1_conn.access_key} created the account topic {account_topic_arn} with args {endpoint_args}"
+ )
+
+ # verify that the old topic is still functional
+ user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
+ user2_bucket.new_key("user2obj1").set_contents_from_string("object content")
+ wait_for_queue_to_drain(topic_name, http_port=port)
+ # wait_for_queue_to_drain(topic_name, tenant=account_id, http_port=port)
+ http_server.verify_s3_events(
+ list(user1_bucket.list()) + list(user2_bucket.list()), exact_match=True
+ )
+
+ # change user1 bucket's subscription to the account topic - using the same notification ID but with the new account_topic_arn
+ topic_conf_list = [
+ {
+ "Id": notification_name,
+ "TopicArn": account_topic_arn,
+ "Events": ["s3:ObjectCreated:*"],
+ }
+ ]
+ s3_notification_conf1 = PSNotificationS3(
+ user1_conn, user1_bucket_name, topic_conf_list
+ )
+ _, status = s3_notification_conf1.set_config()
+ assert_equal(status / 100, 2)
+ # verify subscriptions to the account topic
+ rgw_topic_entry = [
+ t
+ for t in list_topics(tenant=account_id)["topics"]
+ if t["name"] == topic_name
+ ]
+ assert_equal(len(rgw_topic_entry), 1)
+ subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
+ assert_equal(len(subscribed_buckets), 1)
+ assert_in(user1_bucket_name, subscribed_buckets)
+ assert_not_in(user2_bucket_name, subscribed_buckets)
+ # verify bucket notifications while 2 test buckets are in the mixed mode
+ notification_list = list_notifications(user1_bucket_name, assert_len=1)
+ assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+ notification_list = list_notifications(user2_bucket_name, assert_len=1)
+ assert_equal(notification_list["notifications"][0]["TopicArn"], topic_arn)
+
+ # verify both topics are functional at the same time with no duplicate notifications
+ user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
+ user2_bucket.new_key("user2obj1").set_contents_from_string("object content")
+ wait_for_queue_to_drain(topic_name, http_port=port)
+ wait_for_queue_to_drain(topic_name, tenant=account_id, http_port=port)
+ http_server.verify_s3_events(
+ list(user1_bucket.list()) + list(user2_bucket.list()), exact_match=True
+ )
+
+ # also change user2 bucket's subscription to the account topic
+ s3_notification_conf2 = PSNotificationS3(
+ user2_conn, user2_bucket_name, topic_conf_list
+ )
+ _, status = s3_notification_conf2.set_config()
+ assert_equal(status / 100, 2)
+ # remove old topic
+ # note that, although account topic has the same name, it has to be scoped by account/tenant id to be removed
+ # so below command will only remove the old topic
+ _, rc = admin(["topic", "rm", "--topic", topic_name], get_config_cluster())
+ assert_equal(rc, 0)
+
+ # now verify account topic serves both buckets
+ rgw_topic_entry = [
+ t
+ for t in list_topics(tenant=account_id)["topics"]
+ if t["name"] == topic_name
+ ]
+ assert_equal(len(rgw_topic_entry), 1)
+ subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
+ assert_equal(len(subscribed_buckets), 2)
+ assert_in(user1_bucket_name, subscribed_buckets)
+ assert_in(user2_bucket_name, subscribed_buckets)
+ # verify bucket notifications after 2 test buckets are updated to use the account topic
+ notification_list = list_notifications(user1_bucket_name, assert_len=1)
+ assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+ notification_list = list_notifications(user2_bucket_name, assert_len=1)
+ assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+
+ # finally, make sure that notifications are going thru via the new account topic
+ user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
+ user2_bucket.new_key("user2obj1").set_contents_from_string("object content")
+ wait_for_queue_to_drain(topic_name, tenant=account_id, http_port=port)
+ http_server.verify_s3_events(
+ list(user1_bucket.list()) + list(user2_bucket.list()), exact_match=True
+ )
+ log.info("topic migration test has completed successfully")
+ finally:
+ admin(["user", "rm", "--uid", user1_id, "--purge-data"], get_config_cluster())
+ admin(
+ ["bucket", "rm", "--bucket", user1_bucket_name, "--purge-data"],
+ get_config_cluster(),
+ )
+ admin(
+ ["bucket", "rm", "--bucket", user2_bucket_name, "--purge-data"],
+ get_config_cluster(),
+ )
+ admin(["account", "rm", "--account-id", account_id], get_config_cluster())
diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py
index 2d49c7a0ce0..433cd034fe0 100644
--- a/src/test/rgw/rgw_multi/tests.py
+++ b/src/test/rgw/rgw_multi/tests.py
@@ -15,6 +15,7 @@ import boto
import boto.s3.connection
from boto.s3.website import WebsiteConfiguration
from boto.s3.cors import CORSConfiguration
+from botocore.exceptions import ClientError
from nose.tools import eq_ as eq
from nose.tools import assert_not_equal, assert_equal, assert_true, assert_false
@@ -573,6 +574,7 @@ def create_bucket_per_zone_in_realm():
b, z = create_bucket_per_zone(zg_conn)
buckets.extend(b)
zone_bucket.extend(z)
+ realm_meta_checkpoint(realm)
return buckets, zone_bucket
def test_bucket_create():
@@ -1212,6 +1214,9 @@ def test_datalog_autotrim():
# wait for metadata and data sync to catch up
zonegroup_meta_checkpoint(zonegroup)
zonegroup_data_checkpoint(zonegroup_conns)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ time.sleep(config.checkpoint_delay)
+ zonegroup_data_checkpoint(zonegroup_conns)
# trim each datalog
for zone, _ in zone_bucket:
@@ -3634,4 +3639,23 @@ def test_copy_object_different_bucket():
CopySource = source_bucket.name + '/' + objname)
zonegroup_bucket_checkpoint(zonegroup_conns, dest_bucket.name)
-
+
+def test_bucket_create_location_constraint():
+ for zonegroup in realm.current_period.zonegroups:
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ for zg in realm.current_period.zonegroups:
+ z = zonegroup_conns.rw_zones[0]
+ bucket_name = gen_bucket_name()
+ if zg.name == zonegroup.name:
+ # my zonegroup should pass
+ z.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': zg.name})
+ # check bucket location
+ response = z.s3_client.get_bucket_location(Bucket=bucket_name)
+ assert_equal(response['LocationConstraint'], zg.name)
+ else:
+ # other zonegroup should fail with 400
+ e = assert_raises(ClientError,
+ z.s3_client.create_bucket,
+ Bucket=bucket_name,
+ CreateBucketConfiguration={'LocationConstraint': zg.name})
+ assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
diff --git a/src/test/rgw/test-rgw-common.sh b/src/test/rgw/test-rgw-common.sh
index ef8d80e132b..6798a15ba31 100644
--- a/src/test/rgw/test-rgw-common.sh
+++ b/src/test/rgw/test-rgw-common.sh
@@ -99,11 +99,11 @@ function init_first_zone {
secret=$7
# initialize realm
- x $(rgw_admin $cid) realm create --rgw-realm=$realm
+ x $(rgw_admin $cid) realm create --rgw-realm=$realm --default
# create zonegroup, zone
x $(rgw_admin $cid) zonegroup create --rgw-zonegroup=$zg --master --default
- x $(rgw_admin $cid) zone create --rgw-zonegroup=$zg --rgw-zone=$zone --access-key=${access_key} --secret=${secret} --endpoints=$endpoints --default
+ x $(rgw_admin $cid) zone create --rgw-zonegroup=$zg --rgw-zone=$zone --access-key=${access_key} --secret=${secret} --endpoints=$endpoints --master --default
x $(rgw_admin $cid) user create --uid=zone.user --display-name=ZoneUser --access-key=${access_key} --secret=${secret} --system
x $(rgw_admin $cid) period update --commit
@@ -128,7 +128,7 @@ function init_zone_in_existing_zg {
x $(rgw_admin $cid) period update --commit
}
-function init_first_zone_in_slave_zg {
+function init_first_zone_in_peer_zg {
[ $# -ne 8 ] && echo "init_first_zone_in_slave_zg() needs 8 params" && exit 1
cid=$1
diff --git a/src/test/rgw/test-rgw-multisite.sh b/src/test/rgw/test-rgw-multisite.sh
index a005b19e3da..d3a1b265ca6 100755
--- a/src/test/rgw/test-rgw-multisite.sh
+++ b/src/test/rgw/test-rgw-multisite.sh
@@ -1,11 +1,12 @@
#!/usr/bin/env bash
-[ $# -lt 1 ] && echo "usage: $0 <num-clusters> [rgw parameters...]" && exit 1
+[ $# -lt 1 ] && echo "usage: $0 <num-zones> <num-zonegroups>[rgw parameters...]" && exit 1
-num_clusters=$1
+num_zones=$1
+num_zonegroups=$2
shift
-[ $num_clusters -lt 1 ] && echo "clusters num must be at least 1" && exit 1
+[ $num_zones -lt 1 ] && echo "clusters num must be at least 1" && exit 1
. "`dirname $0`/test-rgw-common.sh"
. "`dirname $0`/test-rgw-meta-sync.sh"
@@ -53,7 +54,7 @@ echo realm_status=$output
endpoints=""
i=2
-while [ $i -le $num_clusters ]; do
+while [ $i -le $num_zones ]; do
x $(start_ceph_cluster c$i) -n $(get_mstart_parameters $i)
j=1
endpoints=""
@@ -74,10 +75,53 @@ while [ $i -le $num_clusters ]; do
i=$((i+1))
done
-i=2
-while [ $i -le $num_clusters ]; do
- wait_for_meta_sync c1 c$i $realm_name
+endpoints=""
+k=2
+while [ $k -le $num_zonegroups ]; do
+ x $(start_ceph_cluster c$i) -n $(get_mstart_parameters $i)
+ j=1
+ endpoints=""
+ while [ $j -le $rgws ]; do
+ port=$((8000+i*100+j))
+ endpoints="$endpoints""$url:$port,"
+ j=$((j+1))
+ done
+ # create new zone, start rgw
+ init_first_zone_in_peer_zg c$i $realm_name zg${k} zg${k}-${i} 8101 $endpoints $system_access_key $system_secret
+ j=1
+ while [ $j -le $rgws ]; do
+ port=$((8000+i*100+j))
+ x $(rgw c$i "$port" "$@")
+ j="$((j+1))"
+ done
+# bring up next clusters in zonegroup k
i=$((i+1))
+
+ endpoints=""
+ l=2
+ while [ $l -le $num_zones ]; do
+ x $(start_ceph_cluster c$i) -n $(get_mstart_parameters $i)
+ j=1
+ endpoints=""
+ while [ $j -le $rgws ]; do
+ port=$((8000+i*100+j))
+ endpoints="$endpoints""$url:$port,"
+ j=$((j+1))
+ done
+
+ # create new zone, start rgw
+ init_zone_in_existing_zg c$i $realm_name zg${k} zg${k}-${i} 8101 $endpoints $zone_port $system_access_key $system_secret
+ j=1
+ while [ $j -le $rgws ]; do
+ port=$((8000+i*100+j))
+ x $(rgw c$i "$port" "$@")
+ j="$((j+1))"
+ done
+ l=$((l+1))
+ i=$((i+1))
+ done
+
+ k=$((k+1))
done
diff --git a/src/test/rgw/test_log_backing.cc b/src/test/rgw/test_log_backing.cc
index e4109d535d1..a6de690af0f 100644
--- a/src/test/rgw/test_log_backing.cc
+++ b/src/test/rgw/test_log_backing.cc
@@ -20,6 +20,7 @@
#include <fmt/format.h>
+#include "common/Clock.h" // for ceph_clock_now()
#include "include/types.h"
#include "include/rados/librados.hpp"
diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py
index cef6850c88a..1ccd43b5eaa 100644
--- a/src/test/rgw/test_multi.py
+++ b/src/test/rgw/test_multi.py
@@ -251,7 +251,7 @@ def init(parse_args):
realm = multisite.Realm('r')
if bootstrap:
# create the realm on c1
- realm.create(c1)
+ realm.create(c1, ['--default'])
else:
realm.get(c1)
period = multisite.Period(realm=realm)
@@ -305,7 +305,7 @@ def init(parse_args):
cluster.start()
# pull realm configuration from the master's gateway
gateway = realm.meta_master_zone().gateways[0]
- realm.pull(cluster, gateway, admin_creds)
+ realm.pull(cluster, gateway, admin_creds, ['--default'])
endpoints = zone_endpoints(zg, z, args.gateways_per_zone)
if is_master:
@@ -382,7 +382,9 @@ def init(parse_args):
arg += admin_creds.credential_args()
admin_user.create(zone, arg)
# create test account/user
- cluster.admin(['account', 'create', '--account-id', user.account])
+ arg = ['--account-id', user.account]
+ arg += zone.zone_args()
+ cluster.admin(['account', 'create'] + arg)
arg = ['--display-name', 'TestUser']
arg += user_creds.credential_args()
user.create(zone, arg)
diff --git a/src/test/rgw/test_rgw_iam_policy.cc b/src/test/rgw/test_rgw_iam_policy.cc
index 7dadb7812ff..1d13c2aa013 100644
--- a/src/test/rgw/test_rgw_iam_policy.cc
+++ b/src/test/rgw/test_rgw_iam_policy.cc
@@ -75,6 +75,8 @@ using rgw::IAM::s3GetObjectTagging;
using rgw::IAM::s3GetObjectVersion;
using rgw::IAM::s3GetObjectVersionTagging;
using rgw::IAM::s3GetObjectVersionTorrent;
+using rgw::IAM::s3GetObjectAttributes;
+using rgw::IAM::s3GetObjectVersionAttributes;
using rgw::IAM::s3GetPublicAccessBlock;
using rgw::IAM::s3GetReplicationConfiguration;
using rgw::IAM::s3ListAllMyBuckets;
@@ -419,6 +421,8 @@ TEST_F(PolicyTest, Parse3) {
act2[s3GetObjectVersionAcl] = 1;
act2[s3GetObjectTorrent] = 1;
act2[s3GetObjectVersionTorrent] = 1;
+ act2[s3GetObjectAttributes] = 1;
+ act2[s3GetObjectVersionAttributes] = 1;
act2[s3GetAccelerateConfiguration] = 1;
act2[s3GetBucketAcl] = 1;
act2[s3GetBucketOwnershipControls] = 1;
@@ -487,6 +491,8 @@ TEST_F(PolicyTest, Eval3) {
s3allow[s3GetObjectVersion] = 1;
s3allow[s3GetObjectAcl] = 1;
s3allow[s3GetObjectVersionAcl] = 1;
+ s3allow[s3GetObjectAttributes] = 1;
+ s3allow[s3GetObjectVersionAttributes] = 1;
s3allow[s3GetObjectTorrent] = 1;
s3allow[s3GetObjectVersionTorrent] = 1;
s3allow[s3GetAccelerateConfiguration] = 1;
@@ -883,6 +889,8 @@ TEST_F(ManagedPolicyTest, AmazonS3ReadOnlyAccess)
act[s3GetObjectVersionAcl] = 1;
act[s3GetObjectTorrent] = 1;
act[s3GetObjectVersionTorrent] = 1;
+ act[s3GetObjectAttributes] = 1;
+ act[s3GetObjectVersionAttributes] = 1;
act[s3GetAccelerateConfiguration] = 1;
act[s3GetBucketAcl] = 1;
act[s3GetBucketOwnershipControls] = 1;
diff --git a/src/test/rgw/test_rgw_lua.cc b/src/test/rgw/test_rgw_lua.cc
index b2e11e442a2..ad923023a6d 100644
--- a/src/test/rgw/test_rgw_lua.cc
+++ b/src/test/rgw/test_rgw_lua.cc
@@ -9,6 +9,7 @@
#include "rgw_lua_background.h"
#include "rgw_lua_data_filter.h"
#include "rgw_sal_config.h"
+#include "rgw_perf_counters.h"
using namespace std;
using namespace rgw;
@@ -184,9 +185,51 @@ inline std::unique_ptr<sal::RadosStore> make_store() {
return std::make_unique<StoreBundle>(std::move(context_pool));
};
+class TestLuaManager : public rgw::sal::StoreLuaManager {
+ public:
+ std::string lua_script;
+ unsigned read_time = 0;
+ TestLuaManager() {
+ rgw_perf_start(g_cct);
+ }
+ int get_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, std::string& script) override {
+ std::this_thread::sleep_for(std::chrono::seconds(read_time));
+ script = lua_script;
+ return 0;
+ }
+ int put_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, const std::string& script) override {
+ return 0;
+ }
+ int del_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key) override {
+ return 0;
+ }
+ int add_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name) override {
+ return 0;
+ }
+ int remove_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name) override {
+ return 0;
+ }
+ int list_packages(const DoutPrefixProvider* dpp, optional_yield y, rgw::lua::packages_t& packages) override {
+ return 0;
+ }
+ int reload_packages(const DoutPrefixProvider* dpp, optional_yield y) override {
+ return 0;
+ }
+ ~TestLuaManager() {
+ rgw_perf_stop(g_cct);
+ }
+};
+
+void set_script(rgw::sal::LuaManager* manager, const std::string& script) {
+ static_cast<TestLuaManager*>(manager)->lua_script = script;
+}
+void set_read_time(rgw::sal::LuaManager* manager, unsigned read_time) {
+ static_cast<TestLuaManager*>(manager)->read_time = read_time;
+}
+
#define DEFINE_REQ_STATE RGWProcessEnv pe; \
auto store = make_store(); \
- pe.lua.manager = store->get_lua_manager(""); \
+ pe.lua.manager = std::make_unique<TestLuaManager>(); \
RGWEnv e; \
req_state s(g_cct, pe, &e, 0);
@@ -850,24 +893,12 @@ TEST(TestRGWLua, OpsLog)
}
class TestBackground : public rgw::lua::Background {
- const unsigned read_time;
-
-protected:
- int read_script() override {
- // don't read the object from the store
- std::this_thread::sleep_for(std::chrono::seconds(read_time));
- return 0;
- }
-
public:
- TestBackground(sal::RadosStore* store, const std::string& script, rgw::sal::LuaManager* manager, unsigned read_time = 0) :
+ TestBackground(sal::RadosStore* store, rgw::sal::LuaManager* manager) :
rgw::lua::Background(store,
g_cct,
manager,
- 1 /* run every second */),
- read_time(read_time) {
- // the script is passed in the constructor
- rgw_script = script;
+ 1 /* run every second */) {
}
~TestBackground() override {
@@ -878,20 +909,19 @@ public:
TEST(TestRGWLuaBackground, Start)
{
auto store = make_store();
- auto manager = store->get_lua_manager("");
+ auto manager = std::make_unique<TestLuaManager>();
{
// ctr and dtor without running
- TestBackground lua_background(store.get(), "", manager.get());
+ TestBackground lua_background(store.get(), manager.get());
}
{
// ctr and dtor with running
- TestBackground lua_background(store.get(), "", manager.get());
+ TestBackground lua_background(store.get(), manager.get());
lua_background.start();
}
}
-
-constexpr auto wait_time = std::chrono::seconds(3);
+constexpr auto wait_time = std::chrono::milliseconds(100);
template<typename T>
const T& get_table_value(const TestBackground& b, const std::string& index) {
@@ -903,6 +933,15 @@ const T& get_table_value(const TestBackground& b, const std::string& index) {
}
}
+#define WAIT_FOR_BACKGROUND \
+{ \
+ unsigned max_tries = 100; \
+ do { \
+ std::this_thread::sleep_for(wait_time); \
+ --max_tries; \
+ } while (perfcounter->get(l_rgw_lua_script_ok) + perfcounter->get(l_rgw_lua_script_fail) == 0 && max_tries > 0); \
+}
+
TEST(TestRGWLuaBackground, Script)
{
const std::string script = R"(
@@ -912,10 +951,11 @@ TEST(TestRGWLuaBackground, Script)
)";
auto store = make_store();
- auto manager = store->get_lua_manager("");
- TestBackground lua_background(store.get(), script, manager.get());
+ auto manager = std::make_unique<TestLuaManager>();
+ set_script(manager.get(), script);
+ TestBackground lua_background(store.get(), manager.get());
lua_background.start();
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
EXPECT_EQ(get_table_value<std::string>(lua_background, "hello"), "world");
}
@@ -928,9 +968,10 @@ TEST(TestRGWLuaBackground, RequestScript)
)";
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), background_script, pe.lua.manager.get());
+ set_script(pe.lua.manager.get(), background_script);
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
lua_background.start();
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
const std::string request_script = R"(
local key = "hello"
@@ -947,8 +988,9 @@ TEST(TestRGWLuaBackground, RequestScript)
ASSERT_EQ(rc, 0);
EXPECT_EQ(get_table_value<std::string>(lua_background, "hello"), "from request");
// now we resume and let the background set the value
+ perfcounter->set(l_rgw_lua_script_ok, 0);
lua_background.resume(store.get());
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
EXPECT_EQ(get_table_value<std::string>(lua_background, "hello"), "from background");
}
@@ -965,14 +1007,16 @@ TEST(TestRGWLuaBackground, Pause)
)";
auto store = make_store();
- auto manager = store->get_lua_manager("");
- TestBackground lua_background(store.get(), script, manager.get());
+ auto manager = std::make_unique<TestLuaManager>();
+ set_script(manager.get(), script);
+ TestBackground lua_background(store.get(), manager.get());
lua_background.start();
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
const auto value_len = get_table_value<std::string>(lua_background, "hello").size();
EXPECT_GT(value_len, 0);
lua_background.pause();
- std::this_thread::sleep_for(wait_time);
+ // make sure no execution occurs
+ std::this_thread::sleep_for(wait_time*10);
// no change in len
EXPECT_EQ(value_len, get_table_value<std::string>(lua_background, "hello").size());
}
@@ -991,15 +1035,17 @@ TEST(TestRGWLuaBackground, PauseWhileReading)
)";
auto store = make_store();
- auto manager = store->get_lua_manager("");
- TestBackground lua_background(store.get(), script, manager.get(), 2);
+ auto manager = std::make_unique<TestLuaManager>();
+ set_script(manager.get(), script);
+ set_read_time(manager.get(), 2);
+ TestBackground lua_background(store.get(), manager.get());
lua_background.start();
- constexpr auto long_wait_time = std::chrono::seconds(6);
- std::this_thread::sleep_for(long_wait_time);
+ WAIT_FOR_BACKGROUND;
const auto value_len = get_table_value<std::string>(lua_background, "hello").size();
EXPECT_GT(value_len, 0);
lua_background.pause();
- std::this_thread::sleep_for(long_wait_time);
+ // make sure no execution occurs
+ std::this_thread::sleep_for(wait_time*10);
// one execution might occur after pause
EXPECT_TRUE(value_len + 1 >= get_table_value<std::string>(lua_background, "hello").size());
}
@@ -1013,14 +1059,16 @@ TEST(TestRGWLuaBackground, ReadWhilePaused)
)";
auto store = make_store();
- auto manager = store->get_lua_manager("");
- TestBackground lua_background(store.get(), script, manager.get());
+ auto manager = std::make_unique<TestLuaManager>();
+ set_script(manager.get(), script);
+ TestBackground lua_background(store.get(), manager.get());
lua_background.pause();
lua_background.start();
- std::this_thread::sleep_for(wait_time);
+ // make sure no execution occurs
+ std::this_thread::sleep_for(wait_time*10);
EXPECT_EQ(get_table_value<std::string>(lua_background, "hello"), "");
lua_background.resume(store.get());
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
EXPECT_EQ(get_table_value<std::string>(lua_background, "hello"), "world");
}
@@ -1037,18 +1085,21 @@ TEST(TestRGWLuaBackground, PauseResume)
)";
auto store = make_store();
- auto manager = store->get_lua_manager("");
- TestBackground lua_background(store.get(), script, manager.get());
+ auto manager = std::make_unique<TestLuaManager>();
+ set_script(manager.get(), script);
+ TestBackground lua_background(store.get(), manager.get());
lua_background.start();
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
const auto value_len = get_table_value<std::string>(lua_background, "hello").size();
EXPECT_GT(value_len, 0);
lua_background.pause();
- std::this_thread::sleep_for(wait_time);
+ // make sure no execution occurs
+ std::this_thread::sleep_for(wait_time*10);
// no change in len
EXPECT_EQ(value_len, get_table_value<std::string>(lua_background, "hello").size());
+ perfcounter->set(l_rgw_lua_script_ok, 0);
lua_background.resume(store.get());
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
// should be a change in len
EXPECT_GT(get_table_value<std::string>(lua_background, "hello").size(), value_len);
}
@@ -1066,18 +1117,19 @@ TEST(TestRGWLuaBackground, MultipleStarts)
)";
auto store = make_store();
- auto manager = store->get_lua_manager("");
- TestBackground lua_background(store.get(), script, manager.get());
+ auto manager = std::make_unique<TestLuaManager>();
+ set_script(manager.get(), script);
+ TestBackground lua_background(store.get(), manager.get());
lua_background.start();
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
const auto value_len = get_table_value<std::string>(lua_background, "hello").size();
EXPECT_GT(value_len, 0);
lua_background.start();
lua_background.shutdown();
lua_background.shutdown();
- std::this_thread::sleep_for(wait_time);
+ perfcounter->set(l_rgw_lua_script_ok, 0);
lua_background.start();
- std::this_thread::sleep_for(wait_time);
+ WAIT_FOR_BACKGROUND;
// should be a change in len
EXPECT_GT(get_table_value<std::string>(lua_background, "hello").size(), value_len);
}
@@ -1085,7 +1137,7 @@ TEST(TestRGWLuaBackground, MultipleStarts)
TEST(TestRGWLuaBackground, TableValues)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
const std::string request_script = R"(
RGW["key1"] = "string value"
@@ -1107,7 +1159,7 @@ TEST(TestRGWLuaBackground, TableValues)
TEST(TestRGWLuaBackground, TablePersist)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
std::string request_script = R"(
RGW["key1"] = "string value"
@@ -1137,7 +1189,7 @@ TEST(TestRGWLuaBackground, TablePersist)
TEST(TestRGWLuaBackground, TableValuesFromRequest)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
lua_background.start();
const std::string request_script = R"(
@@ -1165,7 +1217,7 @@ TEST(TestRGWLuaBackground, TableValuesFromRequest)
TEST(TestRGWLuaBackground, TableInvalidValue)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
lua_background.start();
const std::string request_script = R"(
@@ -1191,7 +1243,7 @@ TEST(TestRGWLuaBackground, TableInvalidValue)
TEST(TestRGWLuaBackground, TableErase)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
std::string request_script = R"(
RGW["size"] = 0
@@ -1229,7 +1281,7 @@ TEST(TestRGWLuaBackground, TableErase)
TEST(TestRGWLuaBackground, TableIterate)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
const std::string request_script = R"(
RGW["key1"] = "string value"
@@ -1256,7 +1308,7 @@ TEST(TestRGWLuaBackground, TableIterate)
TEST(TestRGWLuaBackground, TableIterateWrite)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
const std::string request_script = R"(
RGW["a"] = 1
@@ -1286,7 +1338,7 @@ TEST(TestRGWLuaBackground, TableIterateWrite)
TEST(TestRGWLuaBackground, TableIncrement)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
const std::string request_script = R"(
RGW["key1"] = 42
@@ -1306,7 +1358,7 @@ TEST(TestRGWLuaBackground, TableIncrement)
TEST(TestRGWLuaBackground, TableIncrementBy)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
const std::string request_script = R"(
RGW["key1"] = 42
@@ -1328,7 +1380,7 @@ TEST(TestRGWLuaBackground, TableIncrementBy)
TEST(TestRGWLuaBackground, TableDecrement)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
const std::string request_script = R"(
RGW["key1"] = 42
@@ -1348,7 +1400,7 @@ TEST(TestRGWLuaBackground, TableDecrement)
TEST(TestRGWLuaBackground, TableDecrementBy)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
const std::string request_script = R"(
RGW["key1"] = 42
@@ -1370,7 +1422,7 @@ TEST(TestRGWLuaBackground, TableDecrementBy)
TEST(TestRGWLuaBackground, TableIncrementValueError)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
std::string request_script = R"(
-- cannot increment string values
@@ -1405,7 +1457,7 @@ TEST(TestRGWLuaBackground, TableIncrementValueError)
TEST(TestRGWLuaBackground, TableIncrementError)
{
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
std::string request_script = R"(
-- missing argument
@@ -1494,7 +1546,7 @@ TEST(TestRGWLua, Data)
)";
DEFINE_REQ_STATE;
- TestBackground lua_background(store.get(), "", pe.lua.manager.get());
+ TestBackground lua_background(store.get(), pe.lua.manager.get());
s.host_id = "foo";
pe.lua.background = &lua_background;
lua::RGWObjFilter filter(&s, script);
diff --git a/src/test/rgw/test_rgw_posix_driver.cc b/src/test/rgw/test_rgw_posix_driver.cc
index caf3c41c8d4..a8b0f9bb348 100644
--- a/src/test/rgw/test_rgw_posix_driver.cc
+++ b/src/test/rgw/test_rgw_posix_driver.cc
@@ -17,6 +17,7 @@
#include <iostream>
#include <filesystem>
#include "common/common_init.h"
+#include "common/errno.h"
#include "global/global_init.h"
using namespace rgw::sal;
@@ -1709,12 +1710,13 @@ public:
off_t ofs{0};
uint64_t accounted_size{0};
std::string tag;
+ rgw::sal::MultipartUpload::prefix_map_t processed_prefixes;
ACLOwner owner;
owner.id = bucket->get_owner();
int ret = upload->complete(env->dpp, null_yield, get_pointer(env->cct), parts,
remove_objs, accounted_size, compressed, cs_info,
- ofs, tag, owner, 0, mp_obj.get());
+ ofs, tag, owner, 0, mp_obj.get(), processed_prefixes);
EXPECT_EQ(ret, 0);
EXPECT_EQ(write_size, ofs);
EXPECT_EQ(write_size, accounted_size);
@@ -2480,6 +2482,7 @@ public:
off_t ofs{0};
uint64_t accounted_size{0};
std::string tag;
+ rgw::sal::MultipartUpload::prefix_map_t processed_prefixes;
ACLOwner owner;
owner.id = bucket->get_owner();
mp_obj->gen_rand_obj_instance_name();
@@ -2489,7 +2492,7 @@ public:
int ret = upload->complete(env->dpp, null_yield, get_pointer(env->cct), parts,
remove_objs, accounted_size, compressed, cs_info,
- ofs, tag, owner, 0, mp_obj.get());
+ ofs, tag, owner, 0, mp_obj.get(), processed_prefixes);
EXPECT_EQ(ret, 0);
EXPECT_EQ(write_size, ofs);
EXPECT_EQ(write_size, accounted_size);