Pentaho Data Integration
InstallationBusiness AnalyticsCToolsData CatalogData QualityLLMs
  • Overview
    • Pentaho Data Integration ..
  • Data Integration
    • Getting Started
      • Configuring PDI UI
      • KETTLE Variables
    • Concepts & Terminolgy
      • Hello World
      • Logging
      • Error Handling
    • Data Sources
      • Flat Files
        • Text
          • Text File Input
          • Text File Output
        • Excel
          • Excel Writer
        • XML
          • Read XML
        • JSON
          • Read JSON
      • Databases
        • CRUID
          • Database Connections
          • Create DB
          • Read DB
          • Update DB
          • Insert / Update DB
          • Delete DB
        • SCDs
          • SCDs
      • Object Stores
        • MinIO
      • SMB
      • Big Data
        • Hadoop
          • Apache Hadoop
    • Enrich Data
      • Merge
        • Merge Streams
        • Merge Rows (diff)
      • Joins
        • Cross Join
        • Merge Join
        • Database Join
        • XML Join
      • Lookups
        • Database Lookups
      • Scripting
        • Formula
        • Modified JavaScript Value
        • User Defined Java Class
    • Enterprise Solution
      • Jobs
        • Job - Hello World
        • Backward Chaining
        • Parallel
      • Parameters & Variables
        • Parameters
        • Variables
      • Scalability
        • Run Configurations
        • Partition
      • Monitoring & Scheduling
        • Monitoring & Scheduling
      • Logging
        • Logging
      • Dockmaker
        • BA & DI Servers
      • Metadata Injection
        • MDI
    • Plugins
      • Hierarchical Data Type
  • Use Cases
    • Streaming Data
      • MQTT
        • Mosquitto
        • HiveMQ
      • AMQP
        • RabbitMQ
      • Kafka
        • Kafka
    • Machine Learning
      • Prerequiste Tasks
      • AutoML
      • Credit Card
    • RESTful API
    • Jenkins
    • GenAI
  • Reference
    • Page 1
Powered by GitBook
On this page
  1. Data Integration
  2. Plugins

Kafka

Last updated 1 year ago

Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and applications. It allows for the publishing, subscribing to, storing, and processing of streams of records in a fault-tolerant way. Kafka is designed for high volume publish-subscribe messages and streams, meant to be durable, fast, and scalable. Essentially, Kafka enables you to model your data as a continuous stream of events which can be consumed in real time or stored for later processing. Common use cases include messaging, website activity tracking, metrics collection and monitoring, log aggregation, stream processing, and event sourcing.

Kafka with KRaft

Kafka is deprecating Zookeeper ..

Its going to be replaced with Kraft .. if you haven't come across it before here's some blurb ..

The core concept of the new quorum controller setup is the fact that Kafka is itself log-based. The changes in the metadata can be presented as messages stored in the log, which can be then streamed to subscribers.

In KRaft mode, we can designate multiple Kafka instances as controllers. The single node can be either working solely as a broker or controller or performing both roles at once (very handy in smaller clusters). This is different from the legacy setup, where we had only one controller. Still, even though we can have multiple controllers, only one is active at the particular moment, and all others work on standby. If the active controller fails, one will take over its tasks.

Only the active controller can make changes to the cluster’s metadata. It persists the updates in a special internal topic (with just one partition) called __cluster_metadata. Messages from that topic are then replicated by all other controllers. This way all of them have almost the newest version of the data in their local replicas. This is a big deal - a new controller no longer has to fetch all the data from Zookeeper. It has all the data in its local log, maybe it just needs to catch up on a few missed messages.

As part of the Pentaho Data Integration & Analytics plugin release journey to decouple plugins from the core Pentaho Server, Pentaho EE 9.5 GA is releasing new plugins and enhancements to its existing plugin collection.

This section is for Reference only. The plugin has been downloaded and installed.

Kafka Offset

A new job entry called Kafka Offset has been added to enable you to change the offset of a topic partition. This Job entry has fields to connect to a Kafka broker or cluster in the Setup and Options tabs.

The following improvements have been made to the Kafka Producer and Kafka consumer steps:

  • Encryption is supported for connection parameters.

  • SSL and Kerberos (SASL) connectivity have been certified.

  • You can now use variables from Kettle properties, PDI environment variables. and parameter variables in the Kafka properties settings on the Options tab.

  • The Kafka client library has been upgraded to 3.4.0.

  • Logging has been improved to make debugging easier.

  • Improved the Kafka consumer step to consume messages until the time stamp set using the Offset Settings tab in the Kafka Offset job.

  • An offset rebalancer has been added to correctly commit offsets if a rebalance occurs when a new consumer is added or an existing consumer removed from the consumer group.

  1. Log into the 'Pentaho Support Portal' and download the plugin.

  1. Select the Pentaho version.

  1. Download selected plugin(s).

  1. Extract Kafka plugins.

cd
cd ~/Downloads
unzip Kafka.zip .

Kafka Job Plugin

  1. Unzip kafka-job plugin.

cd ~/Downloads/Kafka
unzip kafka-job-plugins-10.1.0.0-317-dist.zip .
  1. Install kafka-job plugin.

cd
cd ~/Downloads/kafka-job-plugins-plugin-10.1.0.0-317-dist/kafka-job-plugins-plugin-10.1.0.0-317
./install.sh
  1. Accept License Agreement -> Next

  1. Browse to ../data-integration/plugins directory.

  1. Click 'Next' and accept overwrite warning.

  1. Click Next to complete installation.

  1. Restart Pentaho Data Integration & check Kakfa.

x

x

x

x

x

x

Topics

To create a new topic

kafka-topics.sh --create --topic <TOPIC_NAME> --bootstrap-server localhost:9092

By default the partition is set to 1 and the replication factor is set to 1. You can specify certain partition and replication factor by

kafka-topics.sh --create --topic <TOPIC_NAME> --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

To list the topics

kafka-topics.sh --list --bootstrap-server localhost:9092

To describe a topic

kafka-topics.sh --describe --topic <TOPIC_NAME> --bootstrap-server localhost:9092

To delete a topic

kafka-topics.sh --delete --topic <TOPIC_NAME> --bootstrap-server localhost:9092

Producer

To produce messages to the topic

kafka-console-producer.sh --topic <TOPIC_NAME> --bootstrap-server localhost:9092

By default if there is no topic exist as you specified then Kafka will create that topic with the default partitions and replication factors mentioned in the server.properties You can edit the default configurations at kafka/config/server.properties

We can set different Acknowledgement level by

kafka-console-producer.sh --topic <TOPIC_NAME> --bootstrap-server localhost:9092 --producer-property acks=all

Consumer

To consume the topic

kafka-console-consumer.sh --topic <TOPIC_NAME> --bootstrap-server localhost:9092

By default it will start consuming the message after the above command executed. If need to consume from the beginning

kafka-console-consumer.sh --topic <TOPIC_NAME> --bootstrap-server localhost:9092 --from-beginning

To have a consumer inside a group

kafka-console-consumer.sh --topic <TOPIC_NAME> --bootstrap-server localhost:9092 --group log-application-group-1

Consumer Groups

To list all the consumer groups

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

To describe a consumer group

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <CONSUMER_GROUP>

By this command we can check the current offset of the consumer group, the lag and the log offset.

To reset the offset of a consumer group

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <CONSUMER_GROUP> --reset-offsets --to-earliest --execute --topic first_topic

This command will reset the offset for a specific topic. To reset for all topics

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <CONSUMER_GROUP> --reset-offsets --to-earliest --execute --all-topics

To go before 2 offset

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <CONSUMER_GROUP> --reset-offsets --shift-by -2 --execute --all-topics

If provided positive number it will shift forward and a negative number will shift backward

To delete a consumer group

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group <CONSUMER_GROUP>

You can’t delete a consumer group when it is active


KRaft

To describe the KRaft meta.properties

kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status

To dump the log file for debugging

kafka-dump-log.sh --cluster-metadata-decoder --files <PATH_TO_LOG>
Apache KafkaApache Kafka
https://support.pentaho.com/hc/en-us/articles/17591496360589-Pentaho-EE-Marketplace-Plugins-Releasesupport.pentaho.com
Download Plugins
Kafka Cluster
EE Plugin versions
EE Plugins
Accept overwrite
Logo