summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYuval Lifshitz <ylifshit@ibm.com>2024-12-26 11:48:54 +0100
committerGitHub <noreply@github.com>2024-12-26 11:48:54 +0100
commit34bfbd07e7a3f82b5bb279201882d80e1cbddc19 (patch)
treeb59b927040579c8a56025a0ac47626e648c0ed1b
parentMerge pull request #61144 from cyx1231st/wip-seastore-improve-backref (diff)
parentrgw: allow bucket notification send message to kafka with multiple brokers (diff)
downloadceph-34bfbd07e7a3f82b5bb279201882d80e1cbddc19.tar.xz
ceph-34bfbd07e7a3f82b5bb279201882d80e1cbddc19.zip
Merge pull request #60988 from thuvh/feat/kafka_multiple_brokers
rgw: allow bucket notification send message to kafka with multiple brokers
-rw-r--r--doc/radosgw/notifications.rst4
-rw-r--r--qa/suites/rgw/notifications/tasks/kafka_failover/+0
-rw-r--r--qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml20
l---------qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros1
-rw-r--r--qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml8
-rw-r--r--qa/tasks/kafka_failover.py244
-rw-r--r--qa/tasks/notification_tests.py2
-rw-r--r--src/rgw/driver/rados/rgw_pubsub_push.cc3
-rw-r--r--src/rgw/rgw_kafka.cc23
-rw-r--r--src/rgw/rgw_kafka.h3
-rw-r--r--src/test/rgw/bucket_notification/requirements.txt2
-rw-r--r--src/test/rgw/bucket_notification/test_bn.py36
12 files changed, 328 insertions, 18 deletions
diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst
index 05653956be1..897c280facf 100644
--- a/doc/radosgw/notifications.rst
+++ b/doc/radosgw/notifications.rst
@@ -188,6 +188,7 @@ updating, use the name of an existing topic and different endpoint values).
[&Attributes.entry.15.key=Policy&Attributes.entry.15.value=<policy-JSON-string>]
[&Attributes.entry.16.key=user-name&Attributes.entry.16.value=<user-name-string>]
[&Attributes.entry.17.key=password&Attributes.entry.17.value=<password-string>]
+ [&Attributes.entry.18.key=kafka-brokers&Attributes.entry.18.value=<kafka-broker-list>]
Request parameters:
@@ -296,6 +297,8 @@ Request parameters:
- "broker": Messages are considered "delivered" if acked by the broker. (This
is the default.)
+ - kafka-brokers: A command-separated list of host:port of kafka brokers. These brokers (may contain a broker which is defined in kafka uri) will be added to kafka uri to support sending notifcations to a kafka cluster.
+
.. note::
- The key-value pair of a specific parameter need not reside in the same
@@ -571,6 +574,7 @@ Valid AttributeName that can be passed:
- mechanism: may be provided together with user/password (default: ``PLAIN``).
- kafka-ack-level: No end2end acknowledgement is required. Messages may persist in the
broker before being delivered to their final destinations.
+ - kafka-brokers: Set endpoint with broker(s) as a comma-separated list of host or host:port (default port 9092).
Notifications
~~~~~~~~~~~~~
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/+ b/qa/suites/rgw/notifications/tasks/kafka_failover/+
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/qa/suites/rgw/notifications/tasks/kafka_failover/+
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml
new file mode 100644
index 00000000000..5c83d5c0d23
--- /dev/null
+++ b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml
@@ -0,0 +1,20 @@
+tasks:
+- install:
+- ceph:
+- openssl_keys:
+- rgw:
+ client.0:
+
+overrides:
+ install:
+ ceph:
+ extra_system_packages:
+ rpm:
+ - java
+ deb:
+ - default-jre
+ ceph:
+ conf:
+ global:
+ osd_min_pg_log_entries: 10
+ osd_max_pg_log_entries: 10
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros
new file mode 120000
index 00000000000..46280a42a96
--- /dev/null
+++ b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros
@@ -0,0 +1 @@
+../../.qa/distros/supported-random-distro$/ \ No newline at end of file
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml
new file mode 100644
index 00000000000..01d6fc637de
--- /dev/null
+++ b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml
@@ -0,0 +1,8 @@
+tasks:
+- kafka-failover:
+ client.0:
+ kafka_version: 3.8.1
+- notification-tests:
+ client.0:
+ extra_attr: ["kafka_failover"]
+ rgw_server: client.0
diff --git a/qa/tasks/kafka_failover.py b/qa/tasks/kafka_failover.py
new file mode 100644
index 00000000000..3ca60ab84fc
--- /dev/null
+++ b/qa/tasks/kafka_failover.py
@@ -0,0 +1,244 @@
+"""
+Deploy and configure Kafka for Teuthology
+"""
+import contextlib
+import logging
+import time
+import os
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.orchestra import run
+
+log = logging.getLogger(__name__)
+
+def get_kafka_version(config):
+ for client, client_config in config.items():
+ if 'kafka_version' in client_config:
+ kafka_version = client_config.get('kafka_version')
+ return kafka_version
+
+kafka_prefix = 'kafka_2.13-'
+
+def get_kafka_dir(ctx, config):
+ kafka_version = get_kafka_version(config)
+ current_version = kafka_prefix + kafka_version
+ return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
+
+
+@contextlib.contextmanager
+def install_kafka(ctx, config):
+ """
+ Downloading the kafka tar file.
+ """
+ assert isinstance(config, dict)
+ log.info('Installing Kafka...')
+
+ # programmatically find a nearby mirror so as not to hammer archive.apache.org
+ apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \
+ "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1"
+ log.info("determining apache mirror by running: " + apache_mirror_cmd)
+ apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/)
+ log.info("chosen apache mirror is " + apache_mirror_url_front)
+
+ for (client, _) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ test_dir=teuthology.get_testdir(ctx)
+ current_version = get_kafka_version(config)
+
+ kafka_file = kafka_prefix + current_version + '.tgz'
+
+ link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \
+ current_version + '/' + kafka_file
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file],
+ )
+
+ kafka_dir = get_kafka_dir(ctx, config)
+ # create config for second broker
+ second_broker_config_name = "server2.properties"
+ second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
+ second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/")
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'),
+ 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir)
+ ],
+ )
+
+ # edit config
+ ctx.cluster.only(client).run(
+ args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name)
+ ]
+ )
+
+ try:
+ yield
+ finally:
+ log.info('Removing packaged dependencies of Kafka...')
+ test_dir=get_kafka_dir(ctx, config)
+ current_version = get_kafka_version(config)
+ for (client,_) in config.items():
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', test_dir],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)],
+ )
+
+
+@contextlib.contextmanager
+def run_kafka(ctx,config):
+ """
+ This includes two parts:
+ 1. Starting Zookeeper service
+ 2. Starting Kafka service
+ """
+ assert isinstance(config, dict)
+ log.info('Bringing up Zookeeper and Kafka services...')
+ for (client,_) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ kafka_dir = get_kafka_dir(ctx, config)
+
+ second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
+ second_broker_java_log_dir = "{}/java_logs".format(second_broker_data)
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './zookeeper-server-start.sh',
+ '{tir}/config/zookeeper.properties'.format(tir=kafka_dir),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './kafka-server-start.sh',
+ '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)),
+ './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ try:
+ yield
+ finally:
+ log.info('Stopping Zookeeper and Kafka Services...')
+
+ for (client, _) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-server-stop.sh',
+ '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ time.sleep(5)
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './zookeeper-server-stop.sh',
+ '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ time.sleep(5)
+
+ ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
+
+
+@contextlib.contextmanager
+def run_admin_cmds(ctx,config):
+ """
+ Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
+ """
+ assert isinstance(config, dict)
+ log.info('Checking kafka server through producer/consumer commands...')
+ for (client,_) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
+ '--bootstrap-server', 'localhost:9092'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ 'echo', "First", run.Raw('|'),
+ './kafka-console-producer.sh', '--topic', 'quickstart-events',
+ '--bootstrap-server', 'localhost:9092'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-console-consumer.sh', '--topic', 'quickstart-events',
+ '--from-beginning',
+ '--bootstrap-server', 'localhost:9092',
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ try:
+ yield
+ finally:
+ pass
+
+
+@contextlib.contextmanager
+def task(ctx,config):
+ """
+ Following is the way how to run kafka::
+ tasks:
+ - kafka:
+ client.0:
+ kafka_version: 2.6.0
+ """
+ assert config is None or isinstance(config, list) \
+ or isinstance(config, dict), \
+ "task kafka only supports a list or dictionary for configuration"
+
+ all_clients = ['client.{id}'.format(id=id_)
+ for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
+ if config is None:
+ config = all_clients
+ if isinstance(config, list):
+ config = dict.fromkeys(config)
+
+ log.debug('Kafka config is %s', config)
+
+ with contextutil.nested(
+ lambda: install_kafka(ctx=ctx, config=config),
+ lambda: run_kafka(ctx=ctx, config=config),
+ lambda: run_admin_cmds(ctx=ctx, config=config),
+ ):
+ yield
+
diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py
index b4697a6f797..f1eae3c89c4 100644
--- a/qa/tasks/notification_tests.py
+++ b/qa/tasks/notification_tests.py
@@ -220,7 +220,7 @@ def run_tests(ctx, config):
for client, client_config in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
- attr = ["!kafka_test", "!data_path_v2_kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
+ attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
if 'extra_attr' in client_config:
attr = client_config.get('extra_attr')
diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc
index 07d65fa1028..d22c61e9b08 100644
--- a/src/rgw/driver/rados/rgw_pubsub_push.cc
+++ b/src/rgw/driver/rados/rgw_pubsub_push.cc
@@ -281,7 +281,7 @@ public:
conn_id, _endpoint, get_bool(args, "use-ssl", false),
get_bool(args, "verify-ssl", true), args.get_optional("ca-location"),
args.get_optional("mechanism"), args.get_optional("user-name"),
- args.get_optional("password"))) {
+ args.get_optional("password"), args.get_optional("kafka-brokers"))) {
throw configuration_error("Kafka: failed to create connection to: " +
_endpoint);
}
@@ -434,4 +434,3 @@ void RGWPubSubEndpoint::shutdown_all() {
#endif
shutdown_http_manager();
}
-
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc
index 0807993338d..b38b1a78ec4 100644
--- a/src/rgw/rgw_kafka.cc
+++ b/src/rgw/rgw_kafka.cc
@@ -13,6 +13,7 @@
#include <thread>
#include <atomic>
#include <mutex>
+#include <boost/algorithm/string.hpp>
#include <boost/functional/hash.hpp>
#include <boost/lockfree/queue.hpp>
#include "common/dout.h"
@@ -595,7 +596,8 @@ public:
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> topic_user_name,
- boost::optional<const std::string&> topic_password) {
+ boost::optional<const std::string&> topic_password,
+ boost::optional<const std::string&> brokers) {
if (stopped) {
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
return false;
@@ -603,8 +605,8 @@ public:
std::string user;
std::string password;
- std::string broker;
- if (!parse_url_authority(url, broker, user, password)) {
+ std::string broker_list;
+ if (!parse_url_authority(url, broker_list, user, password)) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
return false;
@@ -632,7 +634,13 @@ public:
ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
return false;
}
- connection_id_t tmp_id(broker, user, password, ca_location, mechanism,
+
+ if (brokers.has_value()) {
+ broker_list.append(",");
+ broker_list.append(brokers.get());
+ }
+
+ connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism,
use_ssl);
std::lock_guard lock(connections_lock);
const auto it = connections.find(tmp_id);
@@ -652,7 +660,7 @@ public:
return false;
}
- auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism);
+ auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism);
if (!new_producer(conn.get())) {
ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl;
return false;
@@ -770,11 +778,12 @@ bool connect(connection_id_t& conn_id,
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> user_name,
- boost::optional<const std::string&> password) {
+ boost::optional<const std::string&> password,
+ boost::optional<const std::string&> brokers) {
std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
- mechanism, user_name, password);
+ mechanism, user_name, password, brokers);
}
int publish(const connection_id_t& conn_id,
diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h
index b7aa0d15759..858b185219f 100644
--- a/src/rgw/rgw_kafka.h
+++ b/src/rgw/rgw_kafka.h
@@ -48,7 +48,8 @@ bool connect(connection_id_t& conn_id,
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> user_name,
- boost::optional<const std::string&> password);
+ boost::optional<const std::string&> password,
+ boost::optional<const std::string&> brokers);
// publish a message over a connection that was already created
int publish(const connection_id_t& conn_id,
diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt
index a3cff2bedab..bb74eceedc3 100644
--- a/src/test/rgw/bucket_notification/requirements.txt
+++ b/src/test/rgw/bucket_notification/requirements.txt
@@ -1,4 +1,4 @@
-nose >=1.0.0
+nose-py3 >=1.0.0
boto >=2.6.0
boto3 >=1.0.0
configparser >=5.0.0
diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py
index 90ee33617fe..83d66b77b4c 100644
--- a/src/test/rgw/bucket_notification/test_bn.py
+++ b/src/test/rgw/bucket_notification/test_bn.py
@@ -410,17 +410,25 @@ kafka_server = 'localhost'
class KafkaReceiver(object):
"""class for receiving and storing messages on a topic from the kafka broker"""
- def __init__(self, topic, security_type):
+ def __init__(self, topic, security_type, kafka_server='localhost'):
from kafka import KafkaConsumer
remaining_retries = 10
port = 9092
if security_type != 'PLAINTEXT':
security_type = 'SSL'
port = 9093
+
+ if kafka_server is None:
+ endpoint = "localhost" + ":" + str(port)
+ elif ":" not in kafka_server:
+ endpoint = kafka_server + ":" + str(port)
+ else:
+ endpoint = kafka_server
+
while remaining_retries > 0:
try:
self.consumer = KafkaConsumer(topic,
- bootstrap_servers = kafka_server+':'+str(port),
+ bootstrap_servers=endpoint,
security_protocol=security_type,
consumer_timeout_ms=16000,
auto_offset_reset='earliest')
@@ -468,9 +476,9 @@ def kafka_receiver_thread_runner(receiver):
print('Kafka receiver ended unexpectedly: ' + str(error))
-def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None):
"""create kafka receiver and thread"""
- receiver = KafkaReceiver(topic, security_type)
+ receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers)
task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
task.daemon = True
return task, receiver
@@ -1304,7 +1312,7 @@ def test_ps_s3_notification_errors_on_master():
conn.delete_bucket(bucket_name)
-def notification_push(endpoint_type, conn, account=None, cloudevents=False):
+def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None):
""" test pushinging notification """
zonegroup = get_config_zonegroup()
# create bucket
@@ -1359,11 +1367,13 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False):
assert_equal(status/100, 2)
elif endpoint_type == 'kafka':
# start amqp receiver
- task, receiver = create_kafka_receiver_thread(topic_name)
+ task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers)
task.start()
endpoint_address = 'kafka://' + kafka_server
# without acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ if kafka_brokers is not None:
+ endpoint_args += '&kafka-brokers=' + kafka_brokers
# create s3 topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
@@ -1581,6 +1591,20 @@ def test_notification_push_kafka():
notification_push('kafka', conn)
+@attr('kafka_failover')
+def test_notification_push_kafka_multiple_brokers_override():
+ """ test pushing kafka s3 notification on master """
+ conn = connection()
+ notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092')
+
+
+@attr('kafka_failover')
+def test_notification_push_kafka_multiple_brokers_append():
+ """ test pushing kafka s3 notification on master """
+ conn = connection()
+ notification_push('kafka', conn, kafka_brokers='localhost:19092')
+
+
@attr('http_test')
def test_ps_s3_notification_multi_delete_on_master():
""" test deletion of multiple keys on master """