diff options
author | Yuval Lifshitz <ylifshit@ibm.com> | 2024-12-26 11:48:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-26 11:48:54 +0100 |
commit | 34bfbd07e7a3f82b5bb279201882d80e1cbddc19 (patch) | |
tree | b59b927040579c8a56025a0ac47626e648c0ed1b | |
parent | Merge pull request #61144 from cyx1231st/wip-seastore-improve-backref (diff) | |
parent | rgw: allow bucket notification send message to kafka with multiple brokers (diff) | |
download | ceph-34bfbd07e7a3f82b5bb279201882d80e1cbddc19.tar.xz ceph-34bfbd07e7a3f82b5bb279201882d80e1cbddc19.zip |
Merge pull request #60988 from thuvh/feat/kafka_multiple_brokers
rgw: allow bucket notification send message to kafka with multiple brokers
-rw-r--r-- | doc/radosgw/notifications.rst | 4 | ||||
-rw-r--r-- | qa/suites/rgw/notifications/tasks/kafka_failover/+ | 0 | ||||
-rw-r--r-- | qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml | 20 | ||||
l--------- | qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros | 1 | ||||
-rw-r--r-- | qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml | 8 | ||||
-rw-r--r-- | qa/tasks/kafka_failover.py | 244 | ||||
-rw-r--r-- | qa/tasks/notification_tests.py | 2 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_pubsub_push.cc | 3 | ||||
-rw-r--r-- | src/rgw/rgw_kafka.cc | 23 | ||||
-rw-r--r-- | src/rgw/rgw_kafka.h | 3 | ||||
-rw-r--r-- | src/test/rgw/bucket_notification/requirements.txt | 2 | ||||
-rw-r--r-- | src/test/rgw/bucket_notification/test_bn.py | 36 |
12 files changed, 328 insertions, 18 deletions
diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 05653956be1..897c280facf 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -188,6 +188,7 @@ updating, use the name of an existing topic and different endpoint values). [&Attributes.entry.15.key=Policy&Attributes.entry.15.value=<policy-JSON-string>] [&Attributes.entry.16.key=user-name&Attributes.entry.16.value=<user-name-string>] [&Attributes.entry.17.key=password&Attributes.entry.17.value=<password-string>] + [&Attributes.entry.18.key=kafka-brokers&Attributes.entry.18.value=<kafka-broker-list>] Request parameters: @@ -296,6 +297,8 @@ Request parameters: - "broker": Messages are considered "delivered" if acked by the broker. (This is the default.) + - kafka-brokers: A command-separated list of host:port of kafka brokers. These brokers (may contain a broker which is defined in kafka uri) will be added to kafka uri to support sending notifcations to a kafka cluster. + .. note:: - The key-value pair of a specific parameter need not reside in the same @@ -571,6 +574,7 @@ Valid AttributeName that can be passed: - mechanism: may be provided together with user/password (default: ``PLAIN``). - kafka-ack-level: No end2end acknowledgement is required. Messages may persist in the broker before being delivered to their final destinations. + - kafka-brokers: Set endpoint with broker(s) as a comma-separated list of host or host:port (default port 9092). Notifications ~~~~~~~~~~~~~ diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/+ b/qa/suites/rgw/notifications/tasks/kafka_failover/+ new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/+ diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml new file mode 100644 index 00000000000..5c83d5c0d23 --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml @@ -0,0 +1,20 @@ +tasks: +- install: +- ceph: +- openssl_keys: +- rgw: + client.0: + +overrides: + install: + ceph: + extra_system_packages: + rpm: + - java + deb: + - default-jre + ceph: + conf: + global: + osd_min_pg_log_entries: 10 + osd_max_pg_log_entries: 10 diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros new file mode 120000 index 00000000000..46280a42a96 --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros @@ -0,0 +1 @@ +../../.qa/distros/supported-random-distro$/
\ No newline at end of file diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml new file mode 100644 index 00000000000..01d6fc637de --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml @@ -0,0 +1,8 @@ +tasks: +- kafka-failover: + client.0: + kafka_version: 3.8.1 +- notification-tests: + client.0: + extra_attr: ["kafka_failover"] + rgw_server: client.0 diff --git a/qa/tasks/kafka_failover.py b/qa/tasks/kafka_failover.py new file mode 100644 index 00000000000..3ca60ab84fc --- /dev/null +++ b/qa/tasks/kafka_failover.py @@ -0,0 +1,244 @@ +""" +Deploy and configure Kafka for Teuthology +""" +import contextlib +import logging +import time +import os + +from teuthology import misc as teuthology +from teuthology import contextutil +from teuthology.orchestra import run + +log = logging.getLogger(__name__) + +def get_kafka_version(config): + for client, client_config in config.items(): + if 'kafka_version' in client_config: + kafka_version = client_config.get('kafka_version') + return kafka_version + +kafka_prefix = 'kafka_2.13-' + +def get_kafka_dir(ctx, config): + kafka_version = get_kafka_version(config) + current_version = kafka_prefix + kafka_version + return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version) + + +@contextlib.contextmanager +def install_kafka(ctx, config): + """ + Downloading the kafka tar file. + """ + assert isinstance(config, dict) + log.info('Installing Kafka...') + + # programmatically find a nearby mirror so as not to hammer archive.apache.org + apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \ + "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1" + log.info("determining apache mirror by running: " + apache_mirror_cmd) + apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/) + log.info("chosen apache mirror is " + apache_mirror_url_front) + + for (client, _) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + test_dir=teuthology.get_testdir(ctx) + current_version = get_kafka_version(config) + + kafka_file = kafka_prefix + current_version + '.tgz' + + link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \ + current_version + '/' + kafka_file + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file], + ) + + kafka_dir = get_kafka_dir(ctx, config) + # create config for second broker + second_broker_config_name = "server2.properties" + second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) + second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/") + + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'), + 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir) + ], + ) + + # edit config + ctx.cluster.only(client).run( + args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name) + ] + ) + + try: + yield + finally: + log.info('Removing packaged dependencies of Kafka...') + test_dir=get_kafka_dir(ctx, config) + current_version = get_kafka_version(config) + for (client,_) in config.items(): + ctx.cluster.only(client).run( + args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)], + ) + + ctx.cluster.only(client).run( + args=['rm', '-rf', test_dir], + ) + + ctx.cluster.only(client).run( + args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)], + ) + + +@contextlib.contextmanager +def run_kafka(ctx,config): + """ + This includes two parts: + 1. Starting Zookeeper service + 2. Starting Kafka service + """ + assert isinstance(config, dict) + log.info('Bringing up Zookeeper and Kafka services...') + for (client,_) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + kafka_dir = get_kafka_dir(ctx, config) + + second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) + second_broker_java_log_dir = "{}/java_logs".format(second_broker_data) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './zookeeper-server-start.sh', + '{tir}/config/zookeeper.properties'.format(tir=kafka_dir), + run.Raw('&'), 'exit' + ], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './kafka-server-start.sh', + '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)), + run.Raw('&'), 'exit' + ], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)), + './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir), + run.Raw('&'), 'exit' + ], + ) + + try: + yield + finally: + log.info('Stopping Zookeeper and Kafka Services...') + + for (client, _) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-server-stop.sh', + '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + + time.sleep(5) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './zookeeper-server-stop.sh', + '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + + time.sleep(5) + + ctx.cluster.only(client).run(args=['killall', '-9', 'java']) + + +@contextlib.contextmanager +def run_admin_cmds(ctx,config): + """ + Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic. + """ + assert isinstance(config, dict) + log.info('Checking kafka server through producer/consumer commands...') + for (client,_) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-topics.sh', '--create', '--topic', 'quickstart-events', + '--bootstrap-server', 'localhost:9092' + ], + ) + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + 'echo', "First", run.Raw('|'), + './kafka-console-producer.sh', '--topic', 'quickstart-events', + '--bootstrap-server', 'localhost:9092' + ], + ) + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-console-consumer.sh', '--topic', 'quickstart-events', + '--from-beginning', + '--bootstrap-server', 'localhost:9092', + run.Raw('&'), 'exit' + ], + ) + + try: + yield + finally: + pass + + +@contextlib.contextmanager +def task(ctx,config): + """ + Following is the way how to run kafka:: + tasks: + - kafka: + client.0: + kafka_version: 2.6.0 + """ + assert config is None or isinstance(config, list) \ + or isinstance(config, dict), \ + "task kafka only supports a list or dictionary for configuration" + + all_clients = ['client.{id}'.format(id=id_) + for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')] + if config is None: + config = all_clients + if isinstance(config, list): + config = dict.fromkeys(config) + + log.debug('Kafka config is %s', config) + + with contextutil.nested( + lambda: install_kafka(ctx=ctx, config=config), + lambda: run_kafka(ctx=ctx, config=config), + lambda: run_admin_cmds(ctx=ctx, config=config), + ): + yield + diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index b4697a6f797..f1eae3c89c4 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -220,7 +220,7 @@ def run_tests(ctx, config): for client, client_config in config.items(): (remote,) = ctx.cluster.only(client).remotes.keys() - attr = ["!kafka_test", "!data_path_v2_kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] + attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] if 'extra_attr' in client_config: attr = client_config.get('extra_attr') diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 07d65fa1028..d22c61e9b08 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -281,7 +281,7 @@ public: conn_id, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"), args.get_optional("mechanism"), args.get_optional("user-name"), - args.get_optional("password"))) { + args.get_optional("password"), args.get_optional("kafka-brokers"))) { throw configuration_error("Kafka: failed to create connection to: " + _endpoint); } @@ -434,4 +434,3 @@ void RGWPubSubEndpoint::shutdown_all() { #endif shutdown_http_manager(); } - diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 0807993338d..b38b1a78ec4 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -13,6 +13,7 @@ #include <thread> #include <atomic> #include <mutex> +#include <boost/algorithm/string.hpp> #include <boost/functional/hash.hpp> #include <boost/lockfree/queue.hpp> #include "common/dout.h" @@ -595,7 +596,8 @@ public: boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism, boost::optional<const std::string&> topic_user_name, - boost::optional<const std::string&> topic_password) { + boost::optional<const std::string&> topic_password, + boost::optional<const std::string&> brokers) { if (stopped) { ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; return false; @@ -603,8 +605,8 @@ public: std::string user; std::string password; - std::string broker; - if (!parse_url_authority(url, broker, user, password)) { + std::string broker_list; + if (!parse_url_authority(url, broker_list, user, password)) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; return false; @@ -632,7 +634,13 @@ public: ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; return false; } - connection_id_t tmp_id(broker, user, password, ca_location, mechanism, + + if (brokers.has_value()) { + broker_list.append(","); + broker_list.append(brokers.get()); + } + + connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism, use_ssl); std::lock_guard lock(connections_lock); const auto it = connections.find(tmp_id); @@ -652,7 +660,7 @@ public: return false; } - auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism); + auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism); if (!new_producer(conn.get())) { ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl; return false; @@ -770,11 +778,12 @@ bool connect(connection_id_t& conn_id, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism, boost::optional<const std::string&> user_name, - boost::optional<const std::string&> password) { + boost::optional<const std::string&> password, + boost::optional<const std::string&> brokers) { std::shared_lock lock(s_manager_mutex); if (!s_manager) return false; return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location, - mechanism, user_name, password); + mechanism, user_name, password, brokers); } int publish(const connection_id_t& conn_id, diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index b7aa0d15759..858b185219f 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -48,7 +48,8 @@ bool connect(connection_id_t& conn_id, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism, boost::optional<const std::string&> user_name, - boost::optional<const std::string&> password); + boost::optional<const std::string&> password, + boost::optional<const std::string&> brokers); // publish a message over a connection that was already created int publish(const connection_id_t& conn_id, diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt index a3cff2bedab..bb74eceedc3 100644 --- a/src/test/rgw/bucket_notification/requirements.txt +++ b/src/test/rgw/bucket_notification/requirements.txt @@ -1,4 +1,4 @@ -nose >=1.0.0 +nose-py3 >=1.0.0 boto >=2.6.0 boto3 >=1.0.0 configparser >=5.0.0 diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 90ee33617fe..83d66b77b4c 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -410,17 +410,25 @@ kafka_server = 'localhost' class KafkaReceiver(object): """class for receiving and storing messages on a topic from the kafka broker""" - def __init__(self, topic, security_type): + def __init__(self, topic, security_type, kafka_server='localhost'): from kafka import KafkaConsumer remaining_retries = 10 port = 9092 if security_type != 'PLAINTEXT': security_type = 'SSL' port = 9093 + + if kafka_server is None: + endpoint = "localhost" + ":" + str(port) + elif ":" not in kafka_server: + endpoint = kafka_server + ":" + str(port) + else: + endpoint = kafka_server + while remaining_retries > 0: try: self.consumer = KafkaConsumer(topic, - bootstrap_servers = kafka_server+':'+str(port), + bootstrap_servers=endpoint, security_protocol=security_type, consumer_timeout_ms=16000, auto_offset_reset='earliest') @@ -468,9 +476,9 @@ def kafka_receiver_thread_runner(receiver): print('Kafka receiver ended unexpectedly: ' + str(error)) -def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'): +def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None): """create kafka receiver and thread""" - receiver = KafkaReceiver(topic, security_type) + receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers) task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,)) task.daemon = True return task, receiver @@ -1304,7 +1312,7 @@ def test_ps_s3_notification_errors_on_master(): conn.delete_bucket(bucket_name) -def notification_push(endpoint_type, conn, account=None, cloudevents=False): +def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None): """ test pushinging notification """ zonegroup = get_config_zonegroup() # create bucket @@ -1359,11 +1367,13 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False): assert_equal(status/100, 2) elif endpoint_type == 'kafka': # start amqp receiver - task, receiver = create_kafka_receiver_thread(topic_name) + task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers) task.start() endpoint_address = 'kafka://' + kafka_server # without acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' + if kafka_brokers is not None: + endpoint_args += '&kafka-brokers=' + kafka_brokers # create s3 topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() @@ -1581,6 +1591,20 @@ def test_notification_push_kafka(): notification_push('kafka', conn) +@attr('kafka_failover') +def test_notification_push_kafka_multiple_brokers_override(): + """ test pushing kafka s3 notification on master """ + conn = connection() + notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092') + + +@attr('kafka_failover') +def test_notification_push_kafka_multiple_brokers_append(): + """ test pushing kafka s3 notification on master """ + conn = connection() + notification_push('kafka', conn, kafka_brokers='localhost:19092') + + @attr('http_test') def test_ps_s3_notification_multi_delete_on_master(): """ test deletion of multiple keys on master """ |