diff options
author | Yuval Lifshitz <ylifshit@ibm.com> | 2024-12-26 11:48:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-26 11:48:54 +0100 |
commit | 34bfbd07e7a3f82b5bb279201882d80e1cbddc19 (patch) | |
tree | b59b927040579c8a56025a0ac47626e648c0ed1b /qa | |
parent | Merge pull request #61144 from cyx1231st/wip-seastore-improve-backref (diff) | |
parent | rgw: allow bucket notification send message to kafka with multiple brokers (diff) | |
download | ceph-34bfbd07e7a3f82b5bb279201882d80e1cbddc19.tar.xz ceph-34bfbd07e7a3f82b5bb279201882d80e1cbddc19.zip |
Merge pull request #60988 from thuvh/feat/kafka_multiple_brokers
rgw: allow bucket notification send message to kafka with multiple brokers
Diffstat (limited to 'qa')
6 files changed, 274 insertions, 1 deletions
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/+ b/qa/suites/rgw/notifications/tasks/kafka_failover/+ new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/+ diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml new file mode 100644 index 00000000000..5c83d5c0d23 --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml @@ -0,0 +1,20 @@ +tasks: +- install: +- ceph: +- openssl_keys: +- rgw: + client.0: + +overrides: + install: + ceph: + extra_system_packages: + rpm: + - java + deb: + - default-jre + ceph: + conf: + global: + osd_min_pg_log_entries: 10 + osd_max_pg_log_entries: 10 diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros new file mode 120000 index 00000000000..46280a42a96 --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros @@ -0,0 +1 @@ +../../.qa/distros/supported-random-distro$/
\ No newline at end of file diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml new file mode 100644 index 00000000000..01d6fc637de --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml @@ -0,0 +1,8 @@ +tasks: +- kafka-failover: + client.0: + kafka_version: 3.8.1 +- notification-tests: + client.0: + extra_attr: ["kafka_failover"] + rgw_server: client.0 diff --git a/qa/tasks/kafka_failover.py b/qa/tasks/kafka_failover.py new file mode 100644 index 00000000000..3ca60ab84fc --- /dev/null +++ b/qa/tasks/kafka_failover.py @@ -0,0 +1,244 @@ +""" +Deploy and configure Kafka for Teuthology +""" +import contextlib +import logging +import time +import os + +from teuthology import misc as teuthology +from teuthology import contextutil +from teuthology.orchestra import run + +log = logging.getLogger(__name__) + +def get_kafka_version(config): + for client, client_config in config.items(): + if 'kafka_version' in client_config: + kafka_version = client_config.get('kafka_version') + return kafka_version + +kafka_prefix = 'kafka_2.13-' + +def get_kafka_dir(ctx, config): + kafka_version = get_kafka_version(config) + current_version = kafka_prefix + kafka_version + return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version) + + +@contextlib.contextmanager +def install_kafka(ctx, config): + """ + Downloading the kafka tar file. + """ + assert isinstance(config, dict) + log.info('Installing Kafka...') + + # programmatically find a nearby mirror so as not to hammer archive.apache.org + apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \ + "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1" + log.info("determining apache mirror by running: " + apache_mirror_cmd) + apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/) + log.info("chosen apache mirror is " + apache_mirror_url_front) + + for (client, _) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + test_dir=teuthology.get_testdir(ctx) + current_version = get_kafka_version(config) + + kafka_file = kafka_prefix + current_version + '.tgz' + + link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \ + current_version + '/' + kafka_file + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file], + ) + + kafka_dir = get_kafka_dir(ctx, config) + # create config for second broker + second_broker_config_name = "server2.properties" + second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) + second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/") + + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'), + 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir) + ], + ) + + # edit config + ctx.cluster.only(client).run( + args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name) + ] + ) + + try: + yield + finally: + log.info('Removing packaged dependencies of Kafka...') + test_dir=get_kafka_dir(ctx, config) + current_version = get_kafka_version(config) + for (client,_) in config.items(): + ctx.cluster.only(client).run( + args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)], + ) + + ctx.cluster.only(client).run( + args=['rm', '-rf', test_dir], + ) + + ctx.cluster.only(client).run( + args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)], + ) + + +@contextlib.contextmanager +def run_kafka(ctx,config): + """ + This includes two parts: + 1. Starting Zookeeper service + 2. Starting Kafka service + """ + assert isinstance(config, dict) + log.info('Bringing up Zookeeper and Kafka services...') + for (client,_) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + kafka_dir = get_kafka_dir(ctx, config) + + second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) + second_broker_java_log_dir = "{}/java_logs".format(second_broker_data) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './zookeeper-server-start.sh', + '{tir}/config/zookeeper.properties'.format(tir=kafka_dir), + run.Raw('&'), 'exit' + ], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './kafka-server-start.sh', + '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)), + run.Raw('&'), 'exit' + ], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)), + './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir), + run.Raw('&'), 'exit' + ], + ) + + try: + yield + finally: + log.info('Stopping Zookeeper and Kafka Services...') + + for (client, _) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-server-stop.sh', + '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + + time.sleep(5) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './zookeeper-server-stop.sh', + '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + + time.sleep(5) + + ctx.cluster.only(client).run(args=['killall', '-9', 'java']) + + +@contextlib.contextmanager +def run_admin_cmds(ctx,config): + """ + Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic. + """ + assert isinstance(config, dict) + log.info('Checking kafka server through producer/consumer commands...') + for (client,_) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-topics.sh', '--create', '--topic', 'quickstart-events', + '--bootstrap-server', 'localhost:9092' + ], + ) + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + 'echo', "First", run.Raw('|'), + './kafka-console-producer.sh', '--topic', 'quickstart-events', + '--bootstrap-server', 'localhost:9092' + ], + ) + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-console-consumer.sh', '--topic', 'quickstart-events', + '--from-beginning', + '--bootstrap-server', 'localhost:9092', + run.Raw('&'), 'exit' + ], + ) + + try: + yield + finally: + pass + + +@contextlib.contextmanager +def task(ctx,config): + """ + Following is the way how to run kafka:: + tasks: + - kafka: + client.0: + kafka_version: 2.6.0 + """ + assert config is None or isinstance(config, list) \ + or isinstance(config, dict), \ + "task kafka only supports a list or dictionary for configuration" + + all_clients = ['client.{id}'.format(id=id_) + for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')] + if config is None: + config = all_clients + if isinstance(config, list): + config = dict.fromkeys(config) + + log.debug('Kafka config is %s', config) + + with contextutil.nested( + lambda: install_kafka(ctx=ctx, config=config), + lambda: run_kafka(ctx=ctx, config=config), + lambda: run_admin_cmds(ctx=ctx, config=config), + ): + yield + diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index b4697a6f797..f1eae3c89c4 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -220,7 +220,7 @@ def run_tests(ctx, config): for client, client_config in config.items(): (remote,) = ctx.cluster.only(client).remotes.keys() - attr = ["!kafka_test", "!data_path_v2_kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] + attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] if 'extra_attr' in client_config: attr = client_config.get('extra_attr') |