📜  Confluent Kafka Python Producer 简介

📅  最后修改于: 2022-05-13 01:54:44.527000             🧑  作者: Mango

Confluent Kafka Python Producer 简介

Apache Kafka是一个发布-订阅消息队列,用于实时数据流。 Apache Kafka 允许您在各种微服务之间发送和接收消息。在本文中,我们将了解如何使用Python和Confluent-Kafka 库发送 JSON 消息。 JavaScript Object Notation (JSON) 是一种基于文本的标准格式,用于表示结构化数据。它是一种常见的数据格式,在电子数据交换中具有多种用途,包括带有服务器的 Web 应用程序。

先决条件:

  • 熟悉 Kafka 基本概念(例如 Kafka 主题、代理、分区、偏移量、生产者、消费者等)。
  • 良好的Python基础知识(pip install ,编写Python方法)。

解决方案 :

Kafka Python Producer 基于我们使用的 Kafka 库有不同的语法和行为。所以第一步是为我们的Python程序选择合适的 Kafka 库。

适用于Python的流行 Kafka 库:

在使用Python开发 Kafka 自动化时,我们在 Internet 上有 3 个流行的库选择:

  1. 派卡夫卡
  2. 卡夫卡蟒蛇
  3. 融合的卡夫卡

这些库中的每一个都有自己的优点和缺点,因此我们必须根据我们的项目要求进行选择。

第 1 步:选择合适的 Kafka 库

如果我们使用的是 Amazon MSK 集群,那么我们可以使用 PyKafka 或 Kafka-python 构建我们的 Kafka 框架(两者都是开源的,并且在 Apache Kafka 中最受欢迎)。如果我们使用 Confluent Kafka 集群,那么我们必须使用 Confluent Kafka Library,因为我们将获得对 Confluent 特定功能(如 ksqlDB、REST 代理和 Schema Registry)的库支持。

我们将使用 Confluent Kafka Library for Python Kafka Producer,因为我们可以使用这个库来处理 Apache Kafka 集群和 Confluent Kafka 集群。

我们需要已经安装了Python 3.x 和 Pip。我们可以执行以下命令在我们的系统中安装库。

pip install confluent-kafka

第 2 步:Kafka 身份验证设置。

与 Internet 上的大多数 Kafka Python教程不同,我们不会在 localhost 上工作。相反,我们将尝试使用 SSL 身份验证连接到远程 Kafka 集群。为了连接到 Kafka 集群,我们通常会从基础设施支持团队获得 1 个 JKS 文件和一个用于此 JKS 文件的密码。此 JKS 文件适用于Java/Spring 但不适用于Python。

所以我们的工作是将这个 JKS 文件转换成适当的格式(正如Python Kafka 库所期望的那样)。
对于 Confluent Kafka 库,我们需要将 JKS 文件转换为 PKCS12 格式,以便连接到远程 Kafka 集群。

要了解更多信息,请访问以下页面:

  1. 如何将 JKS 转换为 PKCS12?
  2. 如何使用 Confluent Kafka Python Consumer 接收消息

第 3 步:使用 SSL 身份验证的 Confluent Kafka Python Producer。

我们将使用在 JKS 期间生成的相同 PKCS12 文件到上述 PKCS 转换步骤。

Python3
import time
import json
from uuid import uuid4
from confluent_kafka import Producer
 
jsonString1 = """ {"name":"Gal", "email":"Gadot84@gmail.com", "salary": "8345.55"} """
jsonString2 = """ {"name":"Dwayne", "email":"Johnson52@gmail.com", "salary": "7345.75"} """
jsonString3 = """ {"name":"Momoa", "email":"Jason91@gmail.com", "salary": "3345.25"} """
 
jsonv1 = jsonString1.encode()
jsonv2 = jsonString2.encode()
jsonv3 = jsonString3.encode()
 
def delivery_report(errmsg, data):
    """
    Reports the Failure or Success of a message delivery.
    Args:
        errmsg  (KafkaError): The Error that occured while message producing.
        data    (Actual message): The message that was produced.
    Note:
        In the delivery report callback the Message.key() and Message.value()
        will be the binary format as encoded by any configured Serializers and
        not the same object that was passed to produce().
        If you wish to pass the original object(s) for key and value to delivery
        report callback we recommend a bound callback or lambda where you pass
        the objects along.
    """   
    if errmsg is not None:
        print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
        return
    print('Message: {} successfully produced to Topic: {} Partition: [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))
 
kafka_topic_name = "kf.topic.empdev"   
#Change your Kafka Topic Name here. For this Example: lets assume our Kafka Topic has 3 Partitions==>  0,1,2
#And We are producing messages uniformly to all partitions.
#We are sending the message as ByteArray.
#If We want read the same message from a Java Consumer Program
#We can configure KEY_DESERIALIZER_CLASS_CONFIG = ByteArrayDeserializer.class
# and VALUE_DESERIALIZER_CLASS_CONFIG = ByteArrayDeserializer.class
 
mysecret = "yourjksPassword"
#you can call remote API to get JKS password instead of hardcoding like above
 
print("Starting Kafka Producer")   
conf = {
        'bootstrap.servers' : 'm1.msk.us-east.aws.com:9094, m2.msk.us-east.aws.com:9094, m3.msk.us-east.aws.com:9094',
        'security.protocol' : 'SSL',
        'ssl.keystore.password' : mysecret,
        'ssl.keystore.location' : './certkey.p12'
        }
         
print("connecting to Kafka topic...")
producer1 = Producer(conf)
 
# Trigger any available delivery report callbacks from previous produce() calls
producer1.poll(0)
 
try:
    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    producer1.produce(topic=kafka_topic_name, key=str(uuid4()), value=jsonv1, on_delivery=delivery_report)
    producer1.produce(topic=kafka_topic_name, key=str(uuid4()), value=jsonv2, on_delivery=delivery_report)
    producer1.produce(topic=kafka_topic_name, key=str(uuid4()), value=jsonv3, on_delivery=delivery_report)
     
    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered.
    producer1.flush()
     
except Exception as ex:
    print("Exception happened :",ex)
     
print("\n Stopping Kafka Producer")


上述代码的示例输出:

Starting Kafka Producer
connecting to Kafka topic...
Message: b'4acef7b3-dx55-5f89-b69r-18b3188f919z' successfully produced to Topic: kf.topic.empdev Partition: [1] at offset 43211
Message: b'98xff6y4-crl5-gfgx-dq1r-k3z5122h611v' successfully produced to Topic: kf.topic.empdev Partition: [2] at offset 43210
Message: b'rus3v9xx-0bd9-astn-mrtn-yyz1920evl6r' successfully produced to Topic: kf.topic.empdev Partition: [0] at offset 43211

Stopping Kafka Producer

结论 :

我们对如何使用Python在 Kafka 主题上发布 JSON 消息有了一些想法。因此,我们可以根据我们的项目需求扩展此代码,并继续修改和开发我们的 Kafka 自动化框架。我们还可以根据某些条件将所有消息发送到特定的 Kafka 分区,而不是平等地发送到所有分区。要探索更多关于 Confluent kafka Python库的信息,我们可以访问:Confluent Docs