kafka-protobuf-serde

Serializer/Deserializer for Kafka to serialize/deserialize Protocol Buffers messages

Лицензия

Лицензия

Категории

Категории

Protobuf Данные Data Structures
Группа

Группа

com.github.daniel-shuy
Идентификатор

Идентификатор

kafka-protobuf-serde
Последняя версия

Последняя версия

2.2.0
Дата

Дата

Тип

Тип

jar
Описание

Описание

kafka-protobuf-serde
Serializer/Deserializer for Kafka to serialize/deserialize Protocol Buffers messages
Ссылка на сайт

Ссылка на сайт

https://github.com/daniel-shuy/kafka-protobuf-serde
Система контроля версий

Система контроля версий

https://github.com/daniel-shuy/kafka-protobuf-serde.git

Скачать kafka-protobuf-serde

Как подключить последнюю версию

<!-- https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-protobuf-serde/ -->
<dependency>
    <groupId>com.github.daniel-shuy</groupId>
    <artifactId>kafka-protobuf-serde</artifactId>
    <version>2.2.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-protobuf-serde/
implementation 'com.github.daniel-shuy:kafka-protobuf-serde:2.2.0'
// https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-protobuf-serde/
implementation ("com.github.daniel-shuy:kafka-protobuf-serde:2.2.0")
'com.github.daniel-shuy:kafka-protobuf-serde:jar:2.2.0'
<dependency org="com.github.daniel-shuy" name="kafka-protobuf-serde" rev="2.2.0">
  <artifact name="kafka-protobuf-serde" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.daniel-shuy', module='kafka-protobuf-serde', version='2.2.0')
)
libraryDependencies += "com.github.daniel-shuy" % "kafka-protobuf-serde" % "2.2.0"
[com.github.daniel-shuy/kafka-protobuf-serde "2.2.0"]

Зависимости

compile (3)

Идентификатор библиотеки Тип Версия
org.apache.kafka : kafka-clients jar 2.2.1
com.google.protobuf : protobuf-java jar 3.8.0
org.apache.kafka : kafka-clients jar 2.2.1

test (5)

Идентификатор библиотеки Тип Версия
org.springframework.boot : spring-boot-starter-test jar 2.1.5.RELEASE
org.springframework.kafka : spring-kafka jar 2.2.6.RELEASE
org.springframework.kafka : spring-kafka-test jar 2.2.6.RELEASE
org.apache.kafka : kafka_2.11 jar 2.2.1
org.apache.kafka : kafka_2.11 jar 2.2.1

Модули Проекта

Данный проект не имеет модулей.

kafka-protobuf-serde

Branch Travis CI CodeFactor Codacy Better Code Hub Coverall
Master Build Status CodeFactor Codacy Badge BCH compliance Coverage Status
Develop Build Status CodeFactor Codacy Badge BCH compliance Coverage Status

Serializer/Deserializer for Kafka to serialize/deserialize Protocol Buffers messages

Requirements

Dependency Version
Kafka 2.X.X
Protobuf 3.X.X
Java 8+

Usage

Add the following to your Maven dependency list:

<dependency>
    <groupId>com.github.daniel-shuy</groupId>
    <artifactId>kafka-protobuf-serde</artifactId>
    <version>2.2.0</version>
</dependency>

Override the protobuf.version property with the version of Protobuf you wish to use (WARNING: do not directly override the protobuf-java dependency version):

<properties>
    <protobuf.version>3.13.0</protobuf.version>
</properties>

Optionally, you may also override the kafka-clients dependency version with the version of Kafka you wish to use:

<properties>
    <kafka.version>2.3.1</kafka.version>
</properties>

Kafka Producer

Properties props = new Properties();
// props.put(..., ...);

Producer<String, MyValue> producer = new KafkaProducer<>(props,
    new StringSerializer(),
    new KafkaProtobufSerializer<>());

producer.send(new ProducerRecord<>("topic", new MyValue()));

Kafka Consumer

Properties props = new Properties();
// props.put(..., ...);

Consumer<String, MyValue> consumer = new KafkaConsumer<>(props,
    new StringDeserializer(),
    new KafkaProtobufDeserializer<>(MyValue.parser()));

consumer.subscribe(Collections.singleton("topic"));
ConsumerRecords<String, MyValue> records = consumer.poll(Duration.ofMillis(100));

records.forEach(record -> {
    String key = record.key();
    MyValue value = record.value();

    // ...
});

Kafka Streams

Serde<String> stringSerde = Serdes.String();
Serde<MyValue> myValueSerde = new KafkaProtobufSerde<>(MyValue.parser());

Properties config = new Properties();
// config.put(..., ...);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MyValue> myValues = builder.stream("input_topic", Consumed.with(stringSerde, myValueSerde));
KStream<String, MyValue> filteredMyValues = myValues.filter((key, value) -> {
    // ...
});
filteredMyValues.to("output_topic", Produced.with(stringSerde, myValueSerde));

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, config);
streams.setUncaughtExceptionHandler((thread, throwable) -> {
    // ...
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();

Spring for Apache Kafka (spring-kafka)

Kafka Producer

@Configuration
public class KafkaConfig {
    @Bean
    public ProducerFactory<String, MyValue> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        // props.put(..., ...);

        return new DefaultKafkaProducerFactory<>(producerProps,
                new StringSerializer(),
                new KafkaProtobufSerializer<>());
    }

    @Bean
    public KafkaTemplate<String, MyValue> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}

Kafka Consumer

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyValue>>
            kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, MyValue> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        // props.put(..., ...);

        return new DefaultKafkaConsumerFactory<>(props,
            new StringDeserializer(),
            new KafkaProtobufDeserializer<>(MyValue.parser()));
    }
}

public class Listener {
    @KafkaListener(id = "foo", topics = "annotated1")
    public void listen1(String foo) {
        // ...
    }
}

Версии библиотеки

Версия
2.2.0
2.1.1
2.1.0
2.0.0
1.0.0