import asyncio
import os
import json
import unstructured_client
from unstructured_client.models import shared
client = unstructured_client.UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY")
)
async def call_api(filename, input_dir, output_dir):
req = {
"partition_parameters": {
"files": {
"content": open(filename, "rb"),
"file_name": os.path.basename(filename),
},
"strategy": shared.Strategy.VLM,
"vlm_model": "gpt-4o",
"vlm_model_provider": "openai",
"languages": ['eng'],
"split_pdf_page", True, # If True, splits the PDF file into smaller chunks of pages.
"split_pdf_allow_failed": True, # If True, the partitioning continues even if some pages fail.
"split_pdf_concurrency_level": 15 # Set the number of concurrent request to the maximum value: 15.
}
}
try:
res = await client.general.partition_async(
request=req
)
element_dicts = [element for element in res.elements]
json_elements = json.dumps(element_dicts, indent=2)
# Create the output directory structure.
relative_path = os.path.relpath(os.path.dirname(filename), input_dir)
output_subdir = os.path.join(output_dir, relative_path)
os.makedirs(output_subdir, exist_ok=True)
# Write the output file.
output_filename = os.path.join(output_subdir, os.path.basename(filename) + ".json")
with open(output_filename, "w") as file:
file.write(json_elements)
except Exception as e:
print(f"Error processing {filename}: {e}")
async def process_files(input_directory, output_directory):
tasks = []
for root, _, files in os.walk(input_directory):
for file in files:
if not file.endswith('.json'):
full_path = os.path.join(root, file)
tasks.append(call_api(full_path, input_directory, output_directory))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(process_files(
input_directory=os.getenv("LOCAL_FILE_INPUT_DIR"),
output_directory=os.getenv("LOCAL_FILE_OUTPUT_DIR")
))