Reactive Kafka

A high level kafka consumer which wrapps the low level api of Kafka Reactor and provides a similar usability like Spring Kafka

Лицензия

Лицензия

Категории

Категории

React Взаимодействие с пользователем Веб-фреймворки
Группа

Группа

com.quandoo.lib
Идентификатор

Идентификатор

reactive-kafka
Последняя версия

Последняя версия

1.5.1
Дата

Дата

Тип

Тип

jar
Описание

Описание

Reactive Kafka
A high level kafka consumer which wrapps the low level api of Kafka Reactor and provides a similar usability like Spring Kafka
Ссылка на сайт

Ссылка на сайт

https://github.com/quandoo/reactive-kafka
Система контроля версий

Система контроля версий

http://github.com/quandoo/reactive-kafka.git

Скачать reactive-kafka

Как подключить последнюю версию

<!-- https://jarcasting.com/artifacts/com.quandoo.lib/reactive-kafka/ -->
<dependency>
    <groupId>com.quandoo.lib</groupId>
    <artifactId>reactive-kafka</artifactId>
    <version>1.5.1</version>
</dependency>
// https://jarcasting.com/artifacts/com.quandoo.lib/reactive-kafka/
implementation 'com.quandoo.lib:reactive-kafka:1.5.1'
// https://jarcasting.com/artifacts/com.quandoo.lib/reactive-kafka/
implementation ("com.quandoo.lib:reactive-kafka:1.5.1")
'com.quandoo.lib:reactive-kafka:jar:1.5.1'
<dependency org="com.quandoo.lib" name="reactive-kafka" rev="1.5.1">
  <artifact name="reactive-kafka" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.quandoo.lib', module='reactive-kafka', version='1.5.1')
)
libraryDependencies += "com.quandoo.lib" % "reactive-kafka" % "1.5.1"
[com.quandoo.lib/reactive-kafka "1.5.1"]

Зависимости

compile (5)

Идентификатор библиотеки Тип Версия
io.reactivex.rxjava2 : rxjava jar 2.2.14
io.projectreactor.addons : reactor-adapter jar 3.3.0.RELEASE
io.projectreactor.kafka : reactor-kafka jar 1.2.1.RELEASE
org.apache.kafka : kafka-clients jar 2.3.1
com.github.daniel-shuy : kafka-jackson-serializer jar 0.1.2

runtime (10)

Идентификатор библиотеки Тип Версия
org.jetbrains.kotlin : kotlin-stdlib-jdk8 jar 1.3.50
org.reflections : reflections jar 0.9.9
com.fasterxml.jackson.core : jackson-core jar 2.10.0
com.fasterxml.jackson.core : jackson-databind jar 2.10.0
com.fasterxml.jackson.datatype : jackson-datatype-jsr310 jar 2.10.0
com.fasterxml.jackson.datatype : jackson-datatype-jdk8 jar 2.10.0
com.fasterxml.jackson.module : jackson-module-kotlin jar 2.10.0
org.apache.commons : commons-lang3 jar 3.9
com.google.guava : guava jar 28.1-jre
org.slf4j : slf4j-api jar 1.7.28

Модули Проекта

Данный проект не имеет модулей.

Reactive Kafka

A high level kafka consumer which wrapps the low level api of Kafka Reactor and provides a similar usability like Spring Kafka.

Dependency

implementation("com.quandoo.lib:reactive-kafka:1.4.0")

Usage

Spring

The configuration is auto-discoverable hence only the artifact has to be included in you project and a yaml configuration has to be added.

Properties

kafka:
  bootstrap-servers: "localhost:9092"                                                     # Kafka servers
  security-protocol: "SSL"                                                                # Security protocol used (Default: PLAINTEXT)
  client-dns-lookup: "use_all_dns_ips"                                                    # Dns lookup (Default: use_all_dns_ips)
  consumer:
    group-id: ${spring.application.name}                                                  # Kafka groupId
    parallelism: 1                                                                        # How many parallel consumptions (Default: 1)
    auto-offset-reset: earliest                                                           # Offset reset (Default: latest)
    batch-size: 10                                                                        # Max number of messages per one batch (Default: 10)
    partition-assignment-strategy: "org.apache.kafka.clients.consumer.RangeAssignor"      # How to assign partitions (Default: org.apache.kafka.clients.consumer.RangeAssignor)
    commit-interval: 200                                                                  # Max time to wait until the committed messages are synced with kafka (Default: 200)
    commit-batch-size: 10                                                                 # Max number of uncommitted messages until the committed messages are synced with kafka (Default: batch-size) 
    heart-beat-interval-millis: 3000                                                      # Heart-beat period (Default: 3000)
    session-timeout-millis: 10000                                                         # Session timeout (Default: 10000)
    retry-backoff-millis: 100                                                             # How long to backoff until retrying again (Default: 100)
    max-pool-interval-millis: 300000                                                      # Max interval between 2 pools (Default: 300000)
  producer:
    max-in-flight: 10                                                                     # Max number of message un-ackd
  
  # Documented in official kafka client
  ssl:
    endpoint-identification-algorithm: ""
    protocol: ""
    enabled-protocols: ""
    provider: ""
    cypher-suites: ""
    keystore-type: ""
    keystore-location: ""
    keystore-password: ""
    key-password: ""
    truststore-type: ""
    truststore-location: ""
    truststore-password: ""
    keymanager-algorithm: ""
    trustmanager-algorithm: ""
    secure-random-implementation: ""
  # Documented in official kafka client
  sasl:
    mechanism: ""
    jaas: ""
    client-callback-handler-class: ""
    login-callback-handler-class: ""
    login-class: ""
    kerbos-service-name: ""
    kerbos-kinit-cmd: ""
    kerbos-ticket-renew-window-factor: 0.5
    kerbos-ticket-renew-jitter: 0.5
    kerbos-min-time-before-relogin: 100
    login-refresh-window-factor: 100
    login-refresh-window-jitter: 100
    login-refresh-min-period-seconds: 10
    login-refresh-buffer-seconds: 10

All consumer properties can be also specified/overloaded in the listener annotation.

Consumer configuration

The function which is handling the message has to return RxJava2 Completable or Reactor Mono. The name parameter is putting the listeners and filters in a group. Filters will apply to listeners which have the same name.

Single Listener
      // Topics support SPEL
      @KafkaListener(groupId = "test-consumer", topics = {"topic1", "topic2"}, valueType = DTO.class)
      public Completable processMessage(final ConsumerRecord<String, DTO> message) {
          // Do something
      }
Batch Listener
      // Topics support SPEL
      @KafkaListener(groupId = "test-consumer", topics = {"topic1", "topic2"}, valueType = DTO.class)
      public Mono<Void> processMessage(final List<ConsumerRecord<String, DTO>> messages) {
          // Do something
      }
Filter

Allows to filter the message after key and value deserializer

      @Component
      @KafkaListenerFilter(groupId = "test-consumer", valueClass = DTO.class)
      public class VersionFilter implements Predicate<ConsumerRecord<Object, Object>> {
      
          @Override
          Boolean apply(ConsumerRecord<Object, Object> receiverRecord) {
              return true
          }
      }
Pre-Filter

Allows to filter the message before the key and value deserializers kick in

      @Component
      @KafkaListenerPreFilter(groupId = "test-consumer")
      public class VersionFilter implements Predicate<ConsumerRecord<Bytes, Bytes>> {
      
          Boolean apply(ConsumerRecord<Bytes, Bytes> consumerRecord) {
              return true
          }
      }
Producer
      @Autowired
      private KafkaSender<String, DTO> kafkaSender;
Limitations

The current implementation supports only keys as strings and message bodies as JSON. It will use the ObjectMapper defined in the spring context

Manual

      public void createConsumer() {
              final KafkaProperties.KafkaConsumerProperties kafkaConsumerProperties = new KafkaProperties.KafkaConsumerProperties();
              kafkaConsumerProperties.setGroupId("test-consumer");
              kafkaConsumerProperties.setAutoOffsetReset("earliest");
      
              final KafkaProperties.KafkaProducerProperties kafkaProducerProperties = new KafkaProperties.KafkaProducerProperties();
              kafkaProducerProperties.setMaxInFlight(10);
      
              final KafkaProperties kafkaProperties = new KafkaProperties();
              kafkaProperties.setBootstrapServers("localhost:9092");
              kafkaProperties.setConsumer(kafkaConsumerProperties);
              kafkaProperties.setProducer(kafkaProducerProperties);
      
              final KafkaListenerMeta<? extends String, ? extends String> kafkaListenerMeta = new KafkaListenerMeta(
                      message -> {
                          // Handle
                          return Completable.complete();
                      },
                      ImmutableList.of("topic1"),
                      String.class,
                      String.class,
                      new StringDeserializer(),
                      new StringDeserializer(),
                      Predicates.alwaysTrue(),
                      Predicates.alwaysTrue()
              );
      
              final KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaProperties, ImmutableList.of(kafkaListenerMeta));
              kafkaConsumer.start();
          }

License

Apache License, Version 2.0

com.quandoo.lib

Quandoo

Public repository of Quandoo open-source projects

Версии библиотеки

Версия
1.5.1
1.5.0
1.4.0
1.3.1
1.3.0
1.2.4
1.2.3
1.2.2
1.2.1
1.2.0
1.1.0
1.0.3
1.0.2
1.0.1
1.0.0