> ## Documentation Index
> Fetch the complete documentation index at: https://docs.unstructured.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

Connect Kafka to your preprocessing pipeline, and use the Unstructured Ingest CLI or the Unstructured Ingest Python library to batch process all your documents and store structured outputs locally on your filesystem.

The requirements are as follows.

* A Kafka cluster in [Confluent Cloud](https://www.confluent.io/confluent-cloud).
  ([Create a cluster](https://docs.confluent.io/cloud/current/clusters/create-cluster.html#create-ak-clusters).)

  The following video shows how to set up a Kafka cluster in Confluent Cloud:

  <iframe width="560" height="315" src="https://www.youtube.com/embed/zcKJ96J4Xvk" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen />

* The [hostname and port number](https://docs.confluent.io/cloud/current/clusters/create-cluster.html#view-a-ak-cluster) of the bootstrap Kafka cluster to connect to..

* The name of the topic to read messages from or write messages to on the cluster.
  [Create a topic](https://docs.confluent.io/cloud/current/client-apps/topics/index.html#create-topics).
  [Access available topics](https://docs.confluent.io/cloud/current/client-apps/topics/index.html#create-topics).

* For authentication, an [API key and secret](https://docs.confluent.io/cloud/current/security/authenticate/workload-identities/service-accounts/api-keys/manage-api-keys.html#add-an-api-key).

The Kafka connector dependencies:

```bash CLI, Python theme={null}
pip install "unstructured-ingest[kafka]"
```

You might also need to install additional dependencies, depending on your needs. [Learn more](/open-source/ingestion/ingest-dependencies).

The following environment variables:

* `KAFKA_BOOTSTRAP_SERVER` - The hostname of the bootstrap Kafka cluster to connect to, represented by `--bootstrap-server` (CLI) or `bootstrap_server` (Python).
* `KAFKA_PORT` - The port number of the cluster, represented by `--port` (CLI) or `port` (Python).
* `KAFKA_TOPIC` - The unique name of the topic to read messages from and write messages to on the cluster, represented by `--topic` (CLI) or `topic` (Python).

If you use Kafka API keys and secrets for authentication:

* `KAFKA_API_KEY` - The Kafka API key value, represented by `--kafka-api-key` (CLI) or `kafka_api_key` (Python).
* `KAFKA_SECRET` - The secret value for the Kafka API key, represented by `--secret` (CLI) or `secret` (Python).

Additional settings include:

* `--confluent` (CLI) or `confluent` (Python): True to indicate that the cluster is running Confluent Kafka.
* `--num-messages-to-consume` (CLI) or `num_messages_to_consume` (Python): The maximum number of messages to get from the topic. The default is `1` if not otherwise specified.
* `--timeout` (CLI) or `timeout` (Python): The maximum amount of time to wait for the response of a request to the topic, expressed in seconds. The default is `1.0` if not otherwise specified.
* `--group-id` (CLI) or `group_id` (Python): The ID of the consumer group, if any, that is associated with the target Kafka cluster.
  (A consumer group is a way to allow a pool of consumers to divide the consumption of data
  over topics and partitions.) The default is `default_group_id` if not otherwise specified.

Now call the Unstructured CLI or Python. The destination connector can be any of the ones supported. This example uses the local destination connector.

This example sends data to Unstructured for processing by default. To process data locally instead, see the instructions at the end of this page.

<CodeGroup>
  ```bash CLI theme={null}
  #!/usr/bin/env bash

  # Chunking and embedding are optional.

  unstructured-ingest \
    kafka \
      --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \ 
      --port $KAFKA_PORT \
      --topic $KAFKA_TOPIC \
      --kafka-api-key $KAFKA_API_KEY \
      --secret $KAFKA_API_KEY \
      --confluent true \
      --batch-size 100 \
      --num-messages-to-consume 1 \
      --timeout 1.0 \
      --output-dir $LOCAL_FILE_OUTPUT_DIR \
      --chunking-strategy by_title \
      --embedding-provider huggingface \
      --partition-by-api \
      --api-key $UNSTRUCTURED_API_KEY \
      --partition-endpoint $UNSTRUCTURED_API_URL \
      --strategy hi_res \
      --additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
  ```

  ```python Python Ingest theme={null}
  import os

  from unstructured_ingest.pipeline.pipeline import Pipeline
  from unstructured_ingest.interfaces import ProcessorConfig

  from unstructured_ingest.processes.connectors.kafka.cloud import (
      CloudKafkaIndexerConfig,
      CloudKafkaDownloaderConfig,
      CloudKafkaConnectionConfig,
      CloudKafkaAccessConfig
  )

  from unstructured_ingest.processes.partitioner import PartitionerConfig
  from unstructured_ingest.processes.chunker import ChunkerConfig
  from unstructured_ingest.processes.embedder import EmbedderConfig
  from unstructured_ingest.processes.connectors.local import LocalUploaderConfig

  # Chunking and embedding are optional.

  if __name__ == "__main__":
      Pipeline.from_configs(
          context=ProcessorConfig(),
          indexer_config=CloudKafkaIndexerConfig(
              topic=os.getenv("KAFKA_TOPIC"),
              num_messages_to_consume=100,
              timeout=1
          ),
          downloader_config=CloudKafkaDownloaderConfig(download_dir=os.getenv("LOCAL_FILE_DOWNLOAD_DIR")),
          source_connection_config=CloudKafkaConnectionConfig(
              access_config=CloudKafkaAccessConfig(
                  kafka_api_key=os.getenv("KAFKA_API_KEY"),
                  secret=os.getenv("KAFKA_SECRET")
              ),
              bootstrap_server=os.getenv("KAFKA_BOOTSTRAP_SERVER"),
              port=os.getenv("KAFKA_PORT")
          ),
          partitioner_config=PartitionerConfig(
              partition_by_api=True,
              api_key=os.getenv("UNSTRUCTURED_API_KEY"),
              partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
              additional_partition_args={
                  "split_pdf_page": True,
                  "split_pdf_allow_failed": True,
                  "split_pdf_concurrency_level": 15
              }
          ),
          chunker_config=ChunkerConfig(chunking_strategy="by_title"),
          embedder_config=EmbedderConfig(embedding_provider="huggingface"),
          uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
      ).run()
  ```
</CodeGroup>

For the Unstructured Ingest CLI and the Unstructured Ingest Python library, you can use the `--partition-by-api` option (CLI) or `partition_by_api` (Python) parameter to specify where files are processed:

* To do local file processing, omit `--partition-by-api` (CLI) or `partition_by_api` (Python), or explicitly specify `partition_by_api=False` (Python).

  Local file processing does not use an Unstructured API key or API URL, so you can also omit the following, if they appear:

  * `--api-key $UNSTRUCTURED_API_KEY` (CLI) or `api_key=os.getenv("UNSTRUCTURED_API_KEY")` (Python)
  * `--partition-endpoint $UNSTRUCTURED_API_URL` (CLI) or `partition_endpoint=os.getenv("UNSTRUCTURED_API_URL")` (Python)
  * The environment variables `UNSTRUCTURED_API_KEY` and `UNSTRUCTURED_API_URL`

* To send files to the legacy [Unstructured Partition Endpoint](/api-reference/legacy-api/partition/overview) for processing, specify `--partition-by-api` (CLI) or `partition_by_api=True` (Python).

  Unstructured also requires an Unstructured API key and API URL, by adding the following:

  * `--api-key $UNSTRUCTURED_API_KEY` (CLI) or `api_key=os.getenv("UNSTRUCTURED_API_KEY")` (Python)
  * `--partition-endpoint $UNSTRUCTURED_API_URL` (CLI) or `partition_endpoint=os.getenv("UNSTRUCTURED_API_URL")` (Python)
  * The environment variables `UNSTRUCTURED_API_KEY` and `UNSTRUCTURED_API_URL`, representing your API key and API URL, respectively.

  <Note>
    You must specify the API URL only if you are not using the default API URL for Unstructured Ingest, which applies to **Let's Go**, **Pay-As-You-Go**, and **Business SaaS** accounts.

    The default API URL for Unstructured Ingest is `https://api.unstructuredapp.io/general/v0/general`, which is the API URL for the legacy[Unstructured Partition Endpoint](/api-reference/legacy-api/partition/overview). However, you should always use the URL that was provided to you when your Unstructured account was created. If you do not have this URL, email Unstructured Support at [support@unstructured.io](mailto:support@unstructured.io).

    If you do not have an API key, [get one now](/api-reference/legacy-api/partition/overview).

    If you are using a **Business** account, the process
    for generating Unstructured API keys, and the Unstructured API URL that you use, are different.
    For instructions, see your Unstructured account administrator, or email Unstructured Support at [support@unstructured.io](mailto:support@unstructured.io).
  </Note>
