Skip to main content

Adding a Kafka Connection

Prerequisites

  • A user with sufficient permissions is required to establish a connection with Kafka.
  • Zeenea traffic flows towards the data source must be open.
TargetProtocolUsual Ports
Kafka BrokerTCP9092
Kafka Schema RegistryHTTP, HTTPS80, 443, 8081

Supported Versions

The connector was developed and tested with Kafka Server version 2.8.0 and is compatible with 3.0 and earlier.

Installing the Plugin

The Kafka connector is available as a plugin and can be downloaded here: Zeenea Connector Downloads.

For more information on how to install a plugin, please refer to the following article: Installing and Configuring Connectors as a Plugin.

Declaring the Connection

Creating and configuring connectors is done through a dedicated configuration file located in the /connections folder of the relevant scanner.

Read more: Managing Connections

In order to establish a connection with a Kafka instance, specifying the following parameters in the dedicated file is required:

ParameterExpected value
nameThe name that will be displayed to catalog users for this connection.
codeThe unique identifier of the connection on the Zeenea platform. Once registered on the platform, this code must not be modified or the connection will be considered as new and the old one removed from the scanner.
connector_idThe type of connector to be used for the connection. Here, the value must be kafka and this value must not be modified.
connection.bootstrap_servers A comma-separated list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Example: broker1.example.com:9092,broker2.example.com:9092
connection.security_protocolProtocol used to communicate with brokers. (Property security.protocol of Kafka Client). Valid values are: PLAINTEXT, SSL. (SASL_PLAINTEXT, SASL_SSL coming soon). Default: PLAINTEXT
connection.schema_registry_url Kafka schema registry URL with the protocol (example: http://schema-registry.example.com:8081)
connection.usernameUsername for Schema Registry basic authentication
connection.passwordPassword for Schema Registry basic authentication
connection.api_keyAPI key name for Schema Registry API authentication
connection.api_secretAPI key secret for Schema Registry API authentication
inventory.strategyInventory strategy. Possible values topic_sample or schema_registry depend on the desired mechanism for retrieving topic schemas.
inventory.sampling_sizeNumber of messages sampled by topic (default value 1000).
inventory.topic_exclude(Optional: default value is _*) Name of topics to ignore in the form of a list of patterns separated by spaces. Each pattern can be a regular expression or a simple pattern (or glob) according to the regex: or glob: prefix. If no prefix is specified, the glob type is assumed. If one of the patterns matches the full name of the topic, it will be ignored. The operation is similar to that of the ~ operator in filters.
Example: _* glob:test* regex:.*_(test|tu)_.*
tls.truststore.pathThe Trust Store file path. This file must be provided in case TLS encryption is activated (protocol https) and when certificates of Kafka servers are delivered by a specific authority. It must contain the certification chain.
tls.truststore.passwordPassword of the trust store file
tls.truststore.typeType of the trust store file. (PKCS12 or JKS). Default value is discovered from the file extension.
tls.keystore.pathThe key store file path. This file must be provided in case TLS encryption is activated (protocol https) and when certificates of Kafka servers are delivered by a specific authority. It must contain the certification chain.
tls.keystore.passwordPassword of the key store file
tls.keystore.typeType of the sey store file (PKCS12 or JKS). Default value is discovered from the file extension.
tls.key.passwordPassword of the private key (if different from the key store password)
note

A template of the configuration file is available in this repository.

Data Extraction

Topic Sample

The connector reads each topic to retrieve the last inventory.sampling_size messages to extract the schema. These messages must be in Avro format. Thus, it retrieves a sample of data to identify the schema used.

Schema Registry

The connector will retrieve from the schema registry the topics whose name is suffixed by -key and -value to determine the list of topics and the list of their fields.

Collected Metadata

Inventory

Will collect the list of topics and its fields.

Dataset

A dataset is a topic.

  • Name
  • Source Description
  • Technical Data:
    • Topic Format (topic_sample)
    • Schema Key Type (schema_registry)
    • Schema Key Version (schema_registry)
    • Schema Value Type (schema_registry)
    • Schema Value Version (schema_registry)

Field

Topic field.

  • Name
  • Source Description
  • Type
  • Can be null: yes, by default
  • Multivalued: Not supported. Default value false.
  • Primary Key: Not supported. Default value false.
  • Technical Metadata:
    • Schema Name (schema_registry)

Object Identification Keys

An identification key is associated with each object in the catalog. In the case of the object being created by a connector, the connector builds it.

Read more: Identification Keys

ObjectIdentification KeyDescription
Datasetcode/topic/dataset name
  • code: Unique identifier of the connection noted in the configuration file
  • dataset name: Topic name
Fieldcode/topic/dataset name/schema type/field name
  • code: Unique identifier of the connection noted in the configuration file
  • dataset name: Topic name
  • schema type: : key or value depending on the source of the schema registry field. value with the topic_sample strategy.
  • field name