> ## Documentation Index
> Fetch the complete documentation index at: https://docs.unstructured.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

Batch process all your records to store structured outputs in Kafka.

The requirements are as follows.

* A Kafka cluster in [Confluent Cloud](https://www.confluent.io/confluent-cloud).
  ([Create a cluster](https://docs.confluent.io/cloud/current/clusters/create-cluster.html#create-ak-clusters).)

  The following video shows how to set up a Kafka cluster in Confluent Cloud:

  <iframe width="560" height="315" src="https://www.youtube.com/embed/zcKJ96J4Xvk" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen />

* The [hostname and port number](https://docs.confluent.io/cloud/current/clusters/create-cluster.html#view-a-ak-cluster) of the bootstrap Kafka cluster to connect to..

* The name of the topic to read messages from or write messages to on the cluster.
  [Create a topic](https://docs.confluent.io/cloud/current/client-apps/topics/index.html#create-topics).
  [Access available topics](https://docs.confluent.io/cloud/current/client-apps/topics/index.html#create-topics).

* For authentication, an [API key and secret](https://docs.confluent.io/cloud/current/security/authenticate/workload-identities/service-accounts/api-keys/manage-api-keys.html#add-an-api-key).

The Kafka connector dependencies:

```bash CLI, Python theme={null}
pip install "unstructured-ingest[kafka]"
```

You might also need to install additional dependencies, depending on your needs. [Learn more](/open-source/ingestion/ingest-dependencies).

The following environment variables:

* `KAFKA_BOOTSTRAP_SERVER` - The hostname of the bootstrap Kafka cluster to connect to, represented by `--bootstrap-server` (CLI) or `bootstrap_server` (Python).
* `KAFKA_PORT` - The port number of the cluster, represented by `--port` (CLI) or `port` (Python).
* `KAFKA_TOPIC` - The unique name of the topic to read messages from and write messages to on the cluster, represented by `--topic` (CLI) or `topic` (Python).

If you use Kafka API keys and secrets for authentication:

* `KAFKA_API_KEY` - The Kafka API key value, represented by `--kafka-api-key` (CLI) or `kafka_api_key` (Python).
* `KAFKA_SECRET` - The secret value for the Kafka API key, represented by `--secret` (CLI) or `secret` (Python).

Additional settings include:

* `--confluent` (CLI) or `confluent` (Python): True to indicate that the cluster is running Confluent Kafka.
* `--num-messages-to-consume` (CLI) or `num_messages_to_consume` (Python): The maximum number of messages to get from the topic. The default is `1` if not otherwise specified.
* `--timeout` (CLI) or `timeout` (Python): The maximum amount of time to wait for the response of a request to the topic, expressed in seconds. The default is `1.0` if not otherwise specified.
* `--group-id` (CLI) or `group_id` (Python): The ID of the consumer group, if any, that is associated with the target Kafka cluster.
  (A consumer group is a way to allow a pool of consumers to divide the consumption of data
  over topics and partitions.) The default is `default_group_id` if not otherwise specified.

Now call the Unstructured CLI or Python. The source connector can be any of the ones supported. This example uses the local source connector:

This example sends files to Unstructured for processing by default. To process files locally instead, see the instructions at the end of this page.

<CodeGroup>
  ```bash CLI theme={null}
  #!/usr/bin/env bash

  # Chunking and embedding are optional.

  unstructured-ingest \
    local \
      --input-path $LOCAL_FILE_INPUT_DIR \
      --output-dir $LOCAL_FILE_OUTPUT_DIR \
      --chunk-elements \
      --embedding-provider huggingface \
      --num-processes 2 \
      --verbose \
      --partition-by-api \
      --api-key $UNSTRUCTURED_API_KEY\
      --partition-endpoint $UNSTRUCTURED_API_URL \
      --strategy hi_res \
      --additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}" \
    kafka \
      --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \ 
      --port $KAFKA_PORT \
      --topic $KAFKA_TOPIC \
      --kafka-api-key $KAFKA_API_KEY \
      --secret $KAFKA_API_KEY \
      --confluent true \
      --num-messages-to-consume 1 \
      --timeout 1.0
  ```

  ```python Python Ingest theme={null}
  import os

  from unstructured_ingest.pipeline.pipeline import Pipeline
  from unstructured_ingest.interfaces import ProcessorConfig

  from unstructured_ingest.processes.connectors.local import (
      LocalIndexerConfig,
      LocalDownloaderConfig,
      LocalConnectionConfig
  )

  from unstructured_ingest.processes.partitioner import PartitionerConfig
  from unstructured_ingest.processes.chunker import ChunkerConfig
  from unstructured_ingest.processes.embedder import EmbedderConfig

  from unstructured_ingest.processes.connectors.kafka.cloud import (
      CloudKafkaConnectionConfig,
      CloudKafkaAccessConfig,
      CloudKafkaUploaderConfig
  )

  # Chunking and embedding are optional.

  if __name__ == "__main__":
      Pipeline.from_configs(
          context=ProcessorConfig(),
          indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
          downloader_config=LocalDownloaderConfig(),
          source_connection_config=LocalConnectionConfig(),
          partitioner_config=PartitionerConfig(
              partition_by_api=True,
              api_key=os.getenv("UNSTRUCTURED_API_KEY"),
              partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
              additional_partition_args={
                  "split_pdf_page": True,
                  "split_pdf_allow_failed": True,
                  "split_pdf_concurrency_level": 15
              }
          ),
          chunker_config=ChunkerConfig(chunking_strategy="by_title"),
          embedder_config=EmbedderConfig(embedding_provider="huggingface"),
          destination_connection_config=CloudKafkaConnectionConfig(
              access_config=CloudKafkaAccessConfig(
                  kafka_api_key=os.getenv("KAFKA_API_KEY"),
                  secret=os.getenv("KAFKA_SECRET")
              ),
              bootstrap_server=os.getenv("KAFKA_BOOTSTRAP_SERVER"),
              port=os.getenv("KAFKA_PORT")
          ),
          uploader_config=CloudKafkaUploaderConfig(
              batch_size=100,
              topic=os.getenv("KAFKA_TOPIC"),
              timeout=10
          )
      ).run()
  ```
</CodeGroup>

For the Unstructured Ingest CLI and the Unstructured Ingest Python library, you can use the `--partition-by-api` option (CLI) or `partition_by_api` (Python) parameter to specify where files are processed:

* To do local file processing, omit `--partition-by-api` (CLI) or `partition_by_api` (Python), or explicitly specify `partition_by_api=False` (Python).

  Local file processing does not use an Unstructured API key or API URL, so you can also omit the following, if they appear:

  * `--api-key $UNSTRUCTURED_API_KEY` (CLI) or `api_key=os.getenv("UNSTRUCTURED_API_KEY")` (Python)
  * `--partition-endpoint $UNSTRUCTURED_API_URL` (CLI) or `partition_endpoint=os.getenv("UNSTRUCTURED_API_URL")` (Python)
  * The environment variables `UNSTRUCTURED_API_KEY` and `UNSTRUCTURED_API_URL`

* To send files to the legacy [Unstructured Partition Endpoint](/api-reference/legacy-api/partition/overview) for processing, specify `--partition-by-api` (CLI) or `partition_by_api=True` (Python).

  Unstructured also requires an Unstructured API key and API URL, by adding the following:

  * `--api-key $UNSTRUCTURED_API_KEY` (CLI) or `api_key=os.getenv("UNSTRUCTURED_API_KEY")` (Python)
  * `--partition-endpoint $UNSTRUCTURED_API_URL` (CLI) or `partition_endpoint=os.getenv("UNSTRUCTURED_API_URL")` (Python)
  * The environment variables `UNSTRUCTURED_API_KEY` and `UNSTRUCTURED_API_URL`, representing your API key and API URL, respectively.

  <Note>
    You must specify the API URL only if you are not using the default API URL for Unstructured Ingest, which applies to **Let's Go**, **Pay-As-You-Go**, and **Business SaaS** accounts.

    The default API URL for Unstructured Ingest is `https://api.unstructuredapp.io/general/v0/general`, which is the API URL for the legacy[Unstructured Partition Endpoint](/api-reference/legacy-api/partition/overview). However, you should always use the URL that was provided to you when your Unstructured account was created. If you do not have this URL, email Unstructured Support at [support@unstructured.io](mailto:support@unstructured.io).

    If you do not have an API key, [get one now](/api-reference/legacy-api/partition/overview).

    If you are using a **Business** account, the process
    for generating Unstructured API keys, and the Unstructured API URL that you use, are different.
    For instructions, see your Unstructured account administrator, or email Unstructured Support at [support@unstructured.io](mailto:support@unstructured.io).
  </Note>
