Managing Apache Kafka topic configurations can be a tedious task, especially when dealing with a large number of topics across multiple clusters. One common requirement is adjusting the retention period of topics to control how long messages are stored. In this blog, we'll explore how to automate this process using Python, with a focus on Amazon MSK (Managed Streaming for Apache Kafka), though the concepts can be applied to any Kafka setup. Additionally, we'll discuss how to skip specific topics based on naming conventions, such as internal Kafka topics or topics with specific keywords.
Understanding Kafka Topic Retention
Kafka topics have various configuration settings that control their behavior, one of which is retention.ms
. This setting determines the duration (in milliseconds) that messages will be retained in the topic before being eligible for deletion. Adjusting this setting is crucial for managing storage resources and ensuring data is available for the required period.
Setting Up Your Environment
Before starting, ensure you have Python installed on your machine. You will also need to install the confluent-kafka
Python package, which provides Kafka client capabilities, including administrative operations.
pip install confluent-kafka
The Script
The script below automates the process of altering the retention period for multiple Kafka topics. It connects to your Kafka cluster, fetches a list of all topics, and updates the retention.ms
configuration for each topic, excluding those that start with __
(common for internal Kafka topics) or contain the word retry
.
Script Breakdown
- Configuration: Initialize the
AdminClient
with the connection details of your Kafka cluster. If you're using Amazon MSK with SSL, ensure you specify thesecurity.protocol
asSSL
. - Fetching Topics: List all topics in the cluster.
- Filtering Topics: Exclude topics based on naming conventions.
- Updating Configurations: Alter the
retention.ms
setting for each eligible topic.
from confluent_kafka.admin import AdminClient, ConfigResource
from confluent_kafka import KafkaException
config = {
'bootstrap.servers': 'your_broker_list',
'security.protocol': 'SSL',
}
admin_client = AdminClient(config)
# Desired retention period: 6 months in milliseconds
retention_period_ms = str(int(182.5 * 24 * 60 * 60 * 1000))
try:
topic_metadata = admin_client.list_topics(timeout=10)
except KafkaException as e:
print(f"Failed to list topics: {e}")
else:
for topic in iter(topic_metadata.topics.values()):
topic_name = topic.topic
if topic_name.startswith("__") or 'retry' in topic_name:
print(f"Skipping topic '{topic_name}'")
continue
new_configs = {'retention.ms': retention_period_ms}
config_resource = ConfigResource(ConfigResource.Type.TOPIC, topic_name, new_configs)
fs = admin_client.alter_configs([config_resource])
for res, f in fs.items():
try:
f.result()
print(f"Configuration for topic '{topic_name}' changed successfully.")
except Exception as e:
print(f"Failed to change configuration for topic '{topic_name}': {e}")
Execution and Considerations
- Execution: Run the script in an environment where it has network access to the Kafka cluster.
- Testing: Always test the script in a non-production environment to avoid unintended consequences.
- Customization: Adjust the script to match your specific requirements, such as different configuration settings or topic naming conventions.
Conclusion
Automating Kafka topic configuration management can save time and reduce the risk of human error, especially in large-scale environments. This Python script provides a basic framework that can be expanded and customized to fit a variety of needs. Whether you're managing an Amazon MSK cluster or another Kafka deployment, the ability to programmatically adjust topic settings is a powerful tool in your Kafka administration arsenal.