Complete the requirements before you begin. You can also learn about Unstructured’s partitioning, chunking, and embedding before you begin.Documentation Index
Fetch the complete documentation index at: https://docs.unstructured.io/llms.txt
Use this file to discover all available pages before exploring further.
Create the on-demand job
Replace
INPUT_DIR with the path to your local directory of files to process. The response includes the job ID.Each on-demand job is limited to 10 files, and each file is limited to 10 MB in size.If you need to launch a series of on-demand jobs in rapid succession, you must wait at least one second between launch
requests. Otherwise, you will receive a rate limit error.A maximum of 5 on-demand jobs can be running in your Unstructured account. If you launch a new on-demand job
but 5 existing on-demand jobs are still running, the new on-demand job will remain in a scheduled state until one of the 5
existing on-demand jobs is done running.
- curl
- Python
Save and run this script:This script requires jq to parse the JSON response.
#!/usr/bin/env bash
INPUT_DIR="/full/path/to/your/directory"
form_args=()
for filepath in "$INPUT_DIR"/*; do
[ -f "$filepath" ] || continue
filename=$(basename "$filepath")
mimetype=$(file --mime-type -b "$filepath")
form_args+=(--form "input_files=@${filepath};filename=${filename};type=${mimetype}")
done
response=$(curl --request POST --location \
"$UNSTRUCTURED_API_URL/api/v1/jobs/" \
--header "accept: application/json" \
--header "unstructured-api-key: $UNSTRUCTURED_API_KEY" \
--form 'request_data={"job_nodes":[{"name":"Partitioner","type":"partition","subtype":"vlm","settings":{"is_dynamic":true,"allow_fast":true}},{"name":"Chunker","type":"chunk","subtype":"chunk_by_title","settings":{"max_characters":2048,"new_after_n_chars":1500,"overlap":160}},{"name":"Embedder","type":"embed","subtype":"azure_openai","settings":{"model_name":"text-embedding-3-large"}}]}' \
"${form_args[@]}")
echo "Job ID: $(echo "$response" | jq -r '.id')"
echo "Input file IDs: $(echo "$response" | jq -c '.input_file_ids')"
import mimetypes
import os
import json
from unstructured_client import UnstructuredClient
from unstructured_client.models.operations import CreateJobRequest
from unstructured_client.models.shared import BodyCreateJob, InputFiles
INPUT_DIR = "/full/path/to/your/directory"
client = UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL")
)
input_files = []
for filename in os.listdir(INPUT_DIR):
full_path = os.path.join(INPUT_DIR, filename)
if not os.path.isfile(full_path):
continue
content_type, _ = mimetypes.guess_type(full_path)
input_files.append(
InputFiles(
content=open(full_path, "rb"),
file_name=filename,
content_type=content_type or "application/octet-stream"
)
)
response = client.jobs.create_job(
request=CreateJobRequest(
body_create_job=BodyCreateJob(
request_data=json.dumps({
"job_nodes": [
{
"name": "Partitioner",
"type": "partition",
"subtype": "vlm",
"settings": {
"is_dynamic": True,
"allow_fast": True
}
},
{
"name": "Chunker",
"type": "chunk",
"subtype": "chunk_by_title",
"settings": {
"max_characters": 2048,
"new_after_n_chars": 1500,
"overlap": 160
}
},
{
"name": "Embedder",
"type": "embed",
"subtype": "azure_openai",
"settings": {
"model_name": "text-embedding-3-large"
}
}
]
}),
input_files=input_files
)
)
)
job_info = response.job_information
print(f"Job ID: {job_info.id}")
print(f"Input file IDs: {job_info.input_file_ids}")
Poll for job status
Replace
JOB_ID with the job ID from the previous step. This script polls every 10 seconds and stops when the job completes.- curl
- Python
Save and run this script:This script requires jq to parse the JSON response.
#!/usr/bin/env bash
JOB_ID="<job-id>"
while true; do
job=$(curl --request GET --silent --location \
"$UNSTRUCTURED_API_URL/api/v1/jobs/$JOB_ID" \
--header "accept: application/json" \
--header "unstructured-api-key: $UNSTRUCTURED_API_KEY")
status=$(echo "$job" | jq -r '.status')
echo "Job status: $status"
if [ "$status" = "COMPLETED" ]; then
echo "Job completed."
echo "Output node file IDs: $(echo "$job" | jq -c '[.output_node_files[].file_id]')"
break
elif [ "$status" = "FAILED" ] || [ "$status" = "STOPPED" ]; then
echo "Job did not complete successfully: $status"
exit 1
fi
sleep 10
done
import os
import time
from unstructured_client import UnstructuredClient
JOB_ID = "<job-id>"
client = UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL")
)
while True:
response = client.jobs.get_job(request={"job_id": JOB_ID})
job_info = response.job_information
status = job_info.status
print(f"Job status: {status.value}")
if status == "COMPLETED":
print("Job completed.")
print(f"Input file IDs: {job_info.input_file_ids}")
print(f"Output node file IDs: {[f.file_id for f in (job_info.output_node_files or [])]}")
break
elif status in ("FAILED", "STOPPED"):
raise RuntimeError(f"Job did not complete successfully: {status}")
time.sleep(10)
Download the job output
Replace
JOB_ID, INPUT_FILE_IDS, OUTPUT_NODE_FILE_IDS, and OUTPUT_DIR with your values from the previous steps.- curl
- Python
Save and run this script:
#!/usr/bin/env bash
JOB_ID="<job-id>"
INPUT_FILE_IDS=("<input-file-id>") # From Step 1
OUTPUT_NODE_FILE_IDS=("<output-file-id>") # From Step 2
OUTPUT_DIR="/full/path/to/your/output/directory"
mkdir -p "$OUTPUT_DIR"
all_file_ids=()
for file_id in "${INPUT_FILE_IDS[@]}" "${OUTPUT_NODE_FILE_IDS[@]}"; do
printf '%s\n' "${all_file_ids[@]}" | grep -qxF "$file_id" || all_file_ids+=("$file_id")
done
for file_id in "${all_file_ids[@]}"; do
curl --request GET --silent --location \
"$UNSTRUCTURED_API_URL/api/v1/jobs/$JOB_ID/download?file_id=$file_id" \
--header "accept: application/json" \
--header "unstructured-api-key: $UNSTRUCTURED_API_KEY" \
--output "$OUTPUT_DIR/$file_id.json"
echo "Saved: $OUTPUT_DIR/$file_id.json"
done
import json
import os
from unstructured_client import UnstructuredClient
from unstructured_client.models.operations import DownloadJobOutputRequest
JOB_ID = "<job-id>"
INPUT_FILE_IDS = ["<input-file-id>"] # From Step 1
OUTPUT_NODE_FILE_IDS = ["<output-file-id>"] # From Step 2
OUTPUT_DIR = "/full/path/to/your/output/directory"
client = UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL")
)
os.makedirs(OUTPUT_DIR, exist_ok=True)
for file_id in dict.fromkeys(INPUT_FILE_IDS + OUTPUT_NODE_FILE_IDS):
response = client.jobs.download_job_output(
request=DownloadJobOutputRequest(job_id=JOB_ID, file_id=file_id)
)
output_path = os.path.join(OUTPUT_DIR, f"{file_id}.json")
with open(output_path, "w") as f:
json.dump(response.any, f, indent=4)
print(f"Saved: {output_path}")
Complete end-to-end script
ReplaceINPUT_DIR and OUTPUT_DIR with your directory paths, then save and run this script.
- curl
- Python
This script requires jq to parse JSON responses.
#!/usr/bin/env bash
INPUT_DIR="/full/path/to/your/input/directory"
OUTPUT_DIR="/full/path/to/your/output/directory"
# Step 1: Create the on-demand job.
form_args=()
for filepath in "$INPUT_DIR"/*; do
[ -f "$filepath" ] || continue
filename=$(basename "$filepath")
mimetype=$(file --mime-type -b "$filepath")
form_args+=(--form "input_files=@${filepath};filename=${filename};type=${mimetype}")
done
response=$(curl --request POST --location \
"$UNSTRUCTURED_API_URL/api/v1/jobs/" \
--header "accept: application/json" \
--header "unstructured-api-key: $UNSTRUCTURED_API_KEY" \
--form 'request_data={"job_nodes":[{"name":"Partitioner","type":"partition","subtype":"vlm","settings":{"is_dynamic":true,"allow_fast":true}},{"name":"Chunker","type":"chunk","subtype":"chunk_by_title","settings":{"max_characters":2048,"new_after_n_chars":1500,"overlap":160}},{"name":"Embedder","type":"embed","subtype":"azure_openai","settings":{"model_name":"text-embedding-3-large"}}]}' \
"${form_args[@]}")
JOB_ID=$(echo "$response" | jq -r '.id')
input_file_ids=$(echo "$response" | jq -r '.input_file_ids[]')
echo "Job ID: $JOB_ID"
# Step 2: Poll until the job completes.
while true; do
job=$(curl --request GET --silent --location \
"$UNSTRUCTURED_API_URL/api/v1/jobs/$JOB_ID" \
--header "accept: application/json" \
--header "unstructured-api-key: $UNSTRUCTURED_API_KEY")
status=$(echo "$job" | jq -r '.status')
echo "Job status: $status"
if [ "$status" = "COMPLETED" ]; then
echo "Job completed."
break
elif [ "$status" = "FAILED" ] || [ "$status" = "STOPPED" ]; then
echo "Job did not complete successfully: $status"
exit 1
fi
sleep 10
done
output_node_file_ids=$(echo "$job" | jq -r '.output_node_files[].file_id')
# Step 3: Download the job output.
mkdir -p "$OUTPUT_DIR"
all_file_ids=()
for file_id in $input_file_ids $output_node_file_ids; do
printf '%s\n' "${all_file_ids[@]}" | grep -qxF "$file_id" || all_file_ids+=("$file_id")
done
for file_id in "${all_file_ids[@]}"; do
curl --request GET --silent --location \
"$UNSTRUCTURED_API_URL/api/v1/jobs/$JOB_ID/download?file_id=$file_id" \
--header "accept: application/json" \
--header "unstructured-api-key: $UNSTRUCTURED_API_KEY" \
--output "$OUTPUT_DIR/$file_id.json"
echo "Saved: $OUTPUT_DIR/$file_id.json"
done
import json
import mimetypes
import os
import time
from unstructured_client import UnstructuredClient
from unstructured_client.models.operations import CreateJobRequest, DownloadJobOutputRequest
from unstructured_client.models.shared import BodyCreateJob, InputFiles
INPUT_DIR = "/full/path/to/your/input/directory"
OUTPUT_DIR = "/full/path/to/your/output/directory"
client = UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL")
)
# Step 1: Create the on-demand job.
input_files = []
for filename in os.listdir(INPUT_DIR):
full_path = os.path.join(INPUT_DIR, filename)
if not os.path.isfile(full_path):
continue
content_type, _ = mimetypes.guess_type(full_path)
input_files.append(
InputFiles(
content=open(full_path, "rb"),
file_name=filename,
content_type=content_type or "application/octet-stream"
)
)
response = client.jobs.create_job(
request=CreateJobRequest(
body_create_job=BodyCreateJob(
request_data=json.dumps({
"job_nodes": [
{
"name": "Partitioner",
"type": "partition",
"subtype": "vlm",
"settings": {
"is_dynamic": True,
"allow_fast": True
}
},
{
"name": "Chunker",
"type": "chunk",
"subtype": "chunk_by_title",
"settings": {
"max_characters": 2048,
"new_after_n_chars": 1500,
"overlap": 160
}
},
{
"name": "Embedder",
"type": "embed",
"subtype": "azure_openai",
"settings": {
"model_name": "text-embedding-3-large"
}
}
]
}),
input_files=input_files
)
)
)
job_id = response.job_information.id
input_file_ids = response.job_information.input_file_ids
print(f"Job ID: {job_id}")
# Step 2: Poll until the job completes.
while True:
response = client.jobs.get_job(request={"job_id": job_id})
job_info = response.job_information
status = job_info.status
print(f"Job status: {status.value}")
if status == "COMPLETED":
print("Job completed.")
break
elif status in ("FAILED", "STOPPED"):
raise RuntimeError(f"Job did not complete successfully: {status}")
time.sleep(10)
output_node_file_ids = [f.file_id for f in (job_info.output_node_files or [])]
# Step 3: Download the job output.
os.makedirs(OUTPUT_DIR, exist_ok=True)
for file_id in dict.fromkeys(input_file_ids + output_node_file_ids):
response = client.jobs.download_job_output(
request=DownloadJobOutputRequest(job_id=job_id, file_id=file_id)
)
output_path = os.path.join(OUTPUT_DIR, f"{file_id}.json")
with open(output_path, "w") as f:
json.dump(response.any, f, indent=4)
print(f"Saved: {output_path}")
What’s next?
- Create on-demand jobs that only partition, add enrichments, or perform structured data extractions.
- Learn about the Unstructured API’s other workflow operations.

