📜  kafkajs - Javascript 代码示例

📅  最后修改于: 2022-03-11 15:03:27.348000             🧑  作者: Mango

代码示例1
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
    clientId: 'node-app',
    brokers: ['localhost:9092'],
    requestTimeout: 3000,
    connectionTimeout: 6000,
    ssl: false
})

exports.producer = async (eventName, data) => {
    const producer = kafka.producer()

    // event kafka producer notification
    await producer.on('producer.connect', () => console.info('producer kafka connected'))
    await producer.on('producer.disconnect', () => console.error('producer kafka disconnect'))
    await producer.on('producer.network.request_timeout', () => console.error('producer kafka network timeout'))

    await producer.connect()
    await producer.send({
        topic: eventName,
        messages: data,
        acks: true,
        compression: 1
    })

    await producer.disconnect()
}

exports.consumer = async (eventName, callback) => {
    const consumer = kafka.consumer({
        groupId: 'test-group',
        maxBytes: 1048576000, // 1GB
        maxBytesPerPartition: 1048576000, // 1GB
        sessionTimeout: 60000,
        heartbeatInterval: 6000,
        rebalanceTimeout: 30000
    })

    // event kafka consumer notification
    await consumer.on('consumer.connect', () => console.info('consumer kafka connected'))
    await consumer.on('consumer.disconnect', () => console.error('consumer kafka disconnect'))
    await consumer.on('consumer.crash', () => console.error('consumer kafka crash'))
    await consumer.on('consumer.stop', () => console.error('consumer kafka stop'))
    await consumer.on('consumer.network.request_timeout', () => console.error('consumer kafka network timeout'))

    await consumer.connect()
    await consumer.subscribe({ topic: eventName, fromBeginning: true })
    await consumer.run({ autoCommit: true, eachMessage: callback })
}