If you’re new to Unstructured, read this note first.

Before you can create a destination connector, you must first sign up for Unstructured and get your Unstructured API key. After you sign up, the Unstructured user interface (UI) appears, which you use to get the key. To learn how, watch this 40-second how-to video.

After you create the destination connector, add it along with a source connector to a workflow. Then run the worklow as a job. To learn how, try out the hands-on Workflow Endpoint quickstart, go directly to the quickstart notebook, or watch the two 4-minute video tutorials for the Unstructured Python SDK.

You can also create destination connectors with the Unstructured user interface (UI). Learn how.

If you need help, reach out to the community on Slack, or contact us directly.

You are now ready to start creating a destination connector! Keep reading to learn how.

Send processed data from Unstructured to Milvus.

The requirements are as follows.

The following video shows how to fulfill the minimum set of requirements for Milvus cloud-based instances, demonstrating Milvus on IBM watsonx.data:

  • For Zilliz Cloud, you will need:

    • A Zilliz Cloud account.

    • A Zilliz Cloud cluster.

    • The URI of the cluster, also known as the cluster’s public endpoint, which takes a format such as https://<cluster-id>.<cluster-type>.<cloud-provider>-<region>.cloud.zilliz.com. Get the cluster’s public endpoint.

    • The token to access the cluster. Get the cluster’s token.

    • The name of the database in the instance.

    • The name of the collection in the database.

      The collection must have a a defined schema before Unstructured can write to the collection. The minimum viable schema for Unstructured contains only the fields element_id, embeddings, and record_id, as follows:

      Field NameField TypeMax LengthDimensionIndexMetric Type
      element_id (primary key field)VARCHAR200
      embeddings (vector field)FLOAT_VECTOR3072Yes (Checked)Cosine
      record_idVARCHAR200
  • For Milvus on IBM watsonx.data, you will need:

  • For Milvus local, you will need:

All Milvus instances require the target collection to have a defined schema before Unstructured can write to the collection. The minimum viable schema for Unstructured contains only the fields element_id, embeddings, and record_id, as follows. This example code demonstrates the use of the Python SDK for Milvus to create a collection with this minimum viable schema, targeting Milvus on IBM watsonx.data. For the connections.connect arguments to connect to other types of Milvus deployments, see your Milvus provider’s documentation:

Python
import os
from pymilvus import (
    connections,
    FieldSchema,
    DataType,
    CollectionSchema,
    Collection,
)

connections.connect(
    alias="default",
    host=os.getenv("MILVUS_GRPC_HOST"),
    port=os.getenv("MILVUS_GRPC_PORT"),
    user=os.getenv("MILVUS_USER"),
    password=os.getenv("MILVUS_PASSWORD"),
    secure=True
)

primary_key = FieldSchema(
    name="element_id",
    dtype=DataType.VARCHAR,
    is_primary=True,
    max_length=200
)

vector = FieldSchema(
    name="embeddings",
    dtype=DataType.FLOAT_VECTOR,
    dim=3072
)

record_id = FieldSchema(
    name="record_id",
    dtype=DataType.VARCHAR,
    max_length=200
)

schema = CollectionSchema(
    fields=[primary_key, vector, record_id],
    enable_dynamic_field=True
)

collection = Collection(
    name="my_collection",
    schema=schema,
    using="default"
)

index_params = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024}
}

collection.create_index(
    field_name="embeddings",
    index_params=index_params
)

Other approaches, such as creating collections instantly or setting nullable and default fields, have not been fully evaluated by Unstructured and might produce unexpected results.

Unstructured cannot provide a schema that is guaranteed to work in all circumstances. This is because these schemas will vary based on your source files’ types; how you want Unstructured to partition, chunk, and generate embeddings; any custom post-processing code that you run; and other factors.

To create a Milvus destination connector, see the following examples.

import os

from unstructured_client import UnstructuredClient
from unstructured_client.models.operations import CreateDestinationRequest
from unstructured_client.models.shared import (
    CreateDestinationConnector,
    DestinationConnectorType,
    MilvusDestinationConnectorConfigInput
)

with UnstructuredClient(api_key_auth=os.getenv("UNSTRUCTURED_API_KEY")) as client:
    response = client.destinations.create_destination(
        request=CreateDestinationRequest(
            create_destination_connector=CreateDestinationConnector(
                name="<name>",
                type=DestinationConnectorType.MILVUS,
                config=MilvusDestinationConnectorConfigInput(
                    user="<user>",
                    uri="<uri>",
                    db_name="<db-name>",
                    password="<password>",
                    collection_name="<collection-name>"
                )
            )
        )
    )

    print(response.destination_connector_information)

Replace the preceding placeholders as follows:

  • <name> (required) - A unique name for this connector.
  • <user> (required) - The username to access the Milvus instance.
  • <uri> (required) - The URI of the instance, for example: https://12345.serverless.gcp-us-west1.cloud.zilliz.com.
  • <db-name> (required) - The name of the database in the instance.
  • <password> (required) - The password corresponding to the username to access the instance.
  • <collection-name> (required) - The name of the collection in the database.