Kafka
Apache Kafka is a scalable distributed streaming platform. It’s best suited for handling real-time data streams. The Kafka add-on provides an integration of both streams and pub/sub clients, using the Kafka API.
Dependencies
To add the Kafka add-on to your project, add the following dependency:
<dependency>
    <groupId>org.seedstack.addons.kafka</groupId>
    <artifactId>kafka</artifactId>
</dependency>dependencies {
    compile("org.seedstack.addons.kafka:kafka:2.0.2")
}You must also add the Apache KAFKA implementation for basic clients at least, and optionally for streams:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.2.0</version>
</dependency>dependencies {
    compile("org.apache.kafka:kafka-streams:2.2.0")
}<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>dependencies {
    compile("org.apache.kafka:kafka-clients:2.2.0")
}Configuration
Configuration is done by declaring one or more MQTT clients:
kafka:
  # Configured Kafka streams with the name of the stream as key
  streams:
    stream1:
      # Kafka properties for configuring the stream
      properties:
        property1: value1
  consumers:
    # Configured Kafka consumer with the name of the consumer as key
    consumer1:
      # Kafka properties for configuring the consumer
      properties:
        property1: value1
  producers:
    # Configured Kafka producer with the name of the producer as key
    producer1:
      # Kafka properties for configuring the producer
      properties:
        property1: value1
To dump the kafka configuration options:
mvn -q -Dargs="kafka" seedstack:configTo enable transactions for a particular producer, specify the transactional.id property on the producer with a unique id. Then use the Kafka transactional API to send messages.
Publishing
To publish messages, inject the Producer interface qualified with a
configured producer name:
public class SomeClass {
    @Inject
    @Named("producer1")
    private Producer<Integer, String> producer;
}
Use the Kafka API to send messages. If the produced is configured as transactional, you must enclose your calls to the
send() method with the programmatic Kafka transaction methods:
public class SomeClass {
    @Inject
    @Named("producer1")
    private Producer<Integer, String> producer;
    
    public void someMethod() throws InterruptedException, IOException {
        producer.send(new ProducerRecord<>("topic", "test"));
    }
}
Do not explicitly close the producer, it will be automatically closed on application shutdown.
Receiving
To receive Kafka records, create a class implementing the ConsumerListener
interface and annotated with @KafkaListener:
@KafkaListener(value = "consumer1", topics = "someTopic")
public class MyConsumerListener implements ConsumerListener<Integer, String> {
    @Logging
    private Logger logger;
    @Override
    public void onConsumerRecord(ConsumerRecord<Integer, String> r) {
        logger.info("Received {}:{}", r.key(), r.value());
    }
    @Override
    public void onException(Exception e) throws Exception {
        // process any exception and re-throw an exception if reception must be temporary stopped 
    }
}
If an exception is re-thrown from the onException() method, the reception will temporarily stop and the underlying
consumer will be gracefully shutdown. A new attempt, with new consumer and listener instances, will be scheduled after
the retry delay.
Using the annotation, you can specify:
- The name of the consumer in configuration that will be used to create the underlying consumer.
- The topic or the topic regular expression pattern to subscribe to.
- The delay to wait before retry in milliseconds.
Streaming
To build a Kafka stream subscribed to one or more topic(s), create a class implementing the StreamBuilder
interface and annotated with @KafkaListener:
@KafkaListener(value = "stream1", topics = "someTopic")
public class MyStreamBuilder implements StreamBuilder<Integer, String> {
    @Logging
    private Logger logger;
    @Override
    public void buildStream(KStream<Integer, String> stream) {
        stream.foreach((key, value) -> {
            logger.info("Processed: {}:{}", key, value);
        });
    }
    @Override
    public void onException(Exception e) {
        // process any exception and re-throw an exception if reception must be temporary stopped 
    }
}
If an exception is re-thrown from the onException() method, the streaming will temporarily stop and the underlying
stream client will be gracefully shutdown. A new attempt, with new stream client and builder instances, will be
scheduled after the retry delay.
Using the annotation, you can specify:
- The name of the stream in configuration that will be used to create the underlying stream client.
- The topic or the topic regular expression pattern to subscribe to.
- The delay to wait before retry in milliseconds.
 Maven
Maven Gradle
Gradle