> ## Documentation Index
> Fetch the complete documentation index at: https://docs.unstructured.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

<Note>
  First time creating a connector? [Read this first](/api-reference/workflow/connector-first-time-reqs).
</Note>

Ingest your files into Unstructured from Kafka.

## Requirements

You will need:

* A Kafka cluster in [Confluent Cloud](https://www.confluent.io/confluent-cloud).
  ([Create a cluster](https://docs.confluent.io/cloud/current/clusters/create-cluster.html#create-ak-clusters).)

  The following video shows how to set up a Kafka cluster in Confluent Cloud:

  <iframe width="560" height="315" src="https://www.youtube.com/embed/zcKJ96J4Xvk" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen />

* The [hostname and port number](https://docs.confluent.io/cloud/current/clusters/create-cluster.html#view-a-ak-cluster) of the bootstrap Kafka cluster to connect to..

* The name of the topic to read messages from or write messages to on the cluster.
  [Create a topic](https://docs.confluent.io/cloud/current/client-apps/topics/index.html#create-topics).
  [Access available topics](https://docs.confluent.io/cloud/current/client-apps/topics/index.html#create-topics).

* For authentication, an [API key and secret](https://docs.confluent.io/cloud/current/security/authenticate/workload-identities/service-accounts/api-keys/manage-api-keys.html#add-an-api-key).

## Examples

To create a Kafka source connector, see the following examples.

For more information on working with source connectors using the Unstructured API, see [Source endpoints](/api-reference/api/source/source-apis).

<CodeGroup>
  ```python Python SDK theme={null}
  import os

  from unstructured_client import UnstructuredClient
  from unstructured_client.models.operations import CreateSourceRequest
  from unstructured_client.models.shared import CreateSourceConnector

  with UnstructuredClient(api_key_auth=os.getenv("UNSTRUCTURED_API_KEY")) as client:
      response = client.sources.create_source(
          request=CreateSourceRequest(
              create_source_connector=CreateSourceConnector(
                  name="<name>",
                  type="kafka-cloud",
                  config={
                      "bootstrap_servers": "<bootstrap-server>",
                      "port": <port>,
                      "group_id": "<group-id>",
                      "kafka_api_key": "<kafka-api-key>",
                      "secret": "<secret>",
                      "topic": "<topic>",
                      "num_message_to_consume": <num-message-to-consume>
                  }
              )
          )
      )

      print(response.source_connector_information)
  ```

  ```bash curl theme={null}
  curl --request 'POST' --location \
  "$UNSTRUCTURED_API_URL/sources" \
  --header 'accept: application/json' \
  --header "unstructured-api-key: $UNSTRUCTURED_API_KEY" \
  --header 'content-type: application/json' \
  --data \
  '{
      "name": "<name>",
      "type": "kafka-cloud",
      "config": {
          "bootstrap_server": "<bootstrap-server>",
          "port": <port>,
          "group_id": "<group-id>",
          "kafka_api_key": "<kafka-api-key>",
          "secret": "<secret>",
          "topic": "<topic>",
          "num_messages_to_consume": <num-messages-to-consume>
      },
  }'
  ```
</CodeGroup>

## Configuration settings

Replace the preceding placeholders as follows:

<ParamField body="name" type="string" required>
  A unique name for this connector.
</ParamField>

<ParamField body="bootstrap_server" type="string">
  The hostname of the bootstrap Kafka cluster to connect to.
</ParamField>

<ParamField body="port" type="integer" default="9092">
  The port number of the bootstrap Kafka cluster to connect to.
</ParamField>

<ParamField body="group_id" type="string" default="default_group_id">
  The ID of the consumer group. A consumer group is a way to allow a pool of consumers to divide the consumption of data over topics and partitions.
</ParamField>

<ParamField body="kafka_api_key" type="string">
  For authentication, the API key for access to the cluster.
</ParamField>

<ParamField body="secret" type="string">
  For authentication, the secret for access to the cluster.
</ParamField>

<ParamField body="topic" type="string">
  The name of the topic to read messages from or write messages to on the cluster.
</ParamField>

<ParamField body="batch_size" type="integer" default="100">
  Destination connector only. The maximum number of messages to send in a single batch.
</ParamField>

<ParamField body="num_messages_to_consume" type="integer" default="100">
  Source connector only. The maximum number of messages that the consumer will try to consume.
</ParamField>

## Learn more

* <Icon icon="blog" />  [Unstructured Platform Now Integrates with Apache Kafka in Confluent Cloud](https://unstructured.io/blog/unstructured-platform-now-integrates-with-apache-kafka-in-confluent-cloud)
