Add a large number of documents
  • 19 Jul 2024
  • 1 Minute to read
  • Contributors
  • Dark
    Light
  • PDF

Add a large number of documents

  • Dark
    Light
  • PDF

Article summary

Note

Default throttle is set at 6 files uploaded per minute.

Add a large number of documents

Here is some boilerplate code script to upload large numbers of documents. You can launch the script with something like:

python3 script.py /path/to/files/folder --api_key <my_api_key>

The script below displays a progress bar and sends files in parallel batches of a given size (10 by default, if you encounter throttling issues, try to reduce batch size).

from openai import AsyncOpenAI
from pathlib import Path
import os
import asyncio
from tqdm.asyncio import tqdm
import json

SUPPORTED_FILE_FORMATS = ["PDF", "pdf", "docx", "doc", "DOC", "md", "MD"]


def load_uploaded_files(log_path: Path):
	if log_path.exists():
		with open(log_path, "r") as f:
			return set(json.load(f))
	return set()


def save_uploaded_file(relative_file_path: Path, log_path: Path):
	if log_path.exists():
		with open(log_path, "r") as f:
			uploaded_files = set(json.load(f))
	else:
		uploaded_files = set()

	uploaded_files.add(str(relative_file_path))

	with open(log_path, "w") as f:
		json.dump(list(uploaded_files), f)


async def upload_file(
		api_key: str, base_url: str, file_path: Path,
		folder_path: Path, semaphore: asyncio.Semaphore
):
	async with semaphore:
		client = AsyncOpenAI(api_key=api_key, base_url=base_url)
		response = await client.files.create(file=open(file_path, "rb"), purpose="documents")
		return response, file_path.relative_to(folder_path)


async def main(folder_path: Path, base_url: str, api_key: str, batch_size: int, log_path: Path):
	uploaded_files = load_uploaded_files(log_path)

	files_list = [
		file for format in SUPPORTED_FILE_FORMATS
		for file in folder_path.rglob(f'*.{format}')
		if str(file.relative_to(folder_path)) not in uploaded_files
	]

	print(f"Detected files to upload: {len(files_list)}")

	semaphore = asyncio.Semaphore(batch_size)

	with tqdm(total=len(files_list)) as pb:
		tasks = [upload_file(api_key, base_url, file, folder_path, semaphore) for file in files_list]
		responses = []
		for task in asyncio.as_completed(tasks):
			resp, relative_path = await task
			responses.append(resp)
			if resp.status == "success":
				save_uploaded_file(relative_path, log_path)
			elif resp.status == "failed":
				print(f"Failed upload: {resp}")
			pb.update(1)

if __name__ == "__main__":
	import argparse

	parser = argparse.ArgumentParser(
		description="Usable script to upload multiple files to Paradigm"
	)
	parser.add_argument(
		"folder_path", type=Path,
		help="Path to the folder containing the documents to upload."
	)
	parser.add_argument(
		"--log_path", default=Path.cwd() / Path("uploaded_files.json"), type=Path,
		help="Path to the JSON file to log uploaded files."
	)
	parser.add_argument(
		"--api_key", default=None,
		help="Paradigm API key to use."
	)
	parser.add_argument(
		"--base_url", default="https://paradigm.lighton.ai/api/v2",
		help="Base url to use."
	)
	parser.add_argument(
		"--batch_size", default=10, type=int,
		help="Number of parallel processes to run"
	)
	args = parser.parse_args()

	if args.api_key is None:
		api_key = os.getenv("PARADIGM_API_KEY", None)
	else:
		api_key = args.api_key

	asyncio.run(
		main(
			folder_path=args.folder_path,
			base_url=args.base_url,
			api_key=api_key,
			batch_size=args.batch_size,
			log_path=args.log_path
		)
	)


Was this article helpful?