Skip to content

Parallel Processing

The parallel module provides tools for parallel execution of tasks with controlled API rate limiting and robust error handling. This module is particularly useful for accelerating the chapter generation process while respecting API rate limits.

Overview

The parallel processing capabilities include:

  1. Controlled Parallel Execution: Execute tasks in parallel with configurable workers
  2. Rate Limit Management: Add randomized delays between API requests to avoid rate limits
  3. Robust Error Handling: Automatically retry failed requests with exponential backoff
  4. Ordered Results: Ensure outputs are returned in the same order as inputs, regardless of completion time
  5. Progressive File Saving: Each chapter is saved to disk as soon as it's generated, preventing data loss

Key Functions

parallel_generate_chapters

The main orchestration function that handles parallel generation of course chapters:

import geminiteacher as gt
from geminiteacher.parallel import parallel_generate_chapters

# Define chapter titles to generate
chapter_titles = [
    "Introduction to Machine Learning",
    "Supervised Learning Algorithms",
    "Unsupervised Learning Techniques"
]

# Generate chapters in parallel
chapters = parallel_generate_chapters(
    chapter_titles=chapter_titles,
    content="Your raw content here",
    max_workers=4,              # Number of parallel workers (processes)
    delay_range=(0.2, 0.8),     # Random delay between API requests in seconds
    max_retries=3,              # Number of retry attempts for failed requests
    course_title="ML_Course",   # Title for saved files
    output_dir="courses"        # Directory to save generated chapters
)

# Process the generated chapters
for i, chapter in enumerate(chapters):
    print(f"Chapter {i+1}: {chapter.title}")

generate_chapter_with_retry

A robust wrapper around the standard generate_chapter function that adds retry logic:

import geminiteacher as gt
from geminiteacher.parallel import generate_chapter_with_retry

# Generate a single chapter with retry logic
chapter = generate_chapter_with_retry(
    chapter_title="Introduction to Neural Networks",
    content="Your raw content here",
    max_retries=3,              # Maximum number of retry attempts
    retry_delay=1.0             # Base delay between retries (will increase exponentially)
)

print(f"Chapter title: {chapter.title}")
print(f"Summary: {chapter.summary[:100]}...")

parallel_map_with_delay

A generic function for applying any function to a list of items in parallel with controlled delays:

from geminiteacher.parallel import parallel_map_with_delay
import time

# Define a function to execute in parallel
def process_item(item, prefix="Item"):
    # Simulate some work
    time.sleep(0.5)
    return f"{prefix}: {item}"

# Items to process
items = ["apple", "banana", "cherry", "date", "elderberry"]

# Process items in parallel with controlled delays
results = parallel_map_with_delay(
    func=process_item,
    items=items,
    max_workers=3,              # Number of parallel workers
    delay_range=(0.1, 0.5),     # Random delay between task submissions
    prefix="Processed"          # Additional parameter passed to process_item
)

# Results are in the same order as the input items
for item, result in zip(items, results):
    print(f"Original: {item} → Result: {result}")

Progressive File Saving

A key feature of the parallel processing module is its ability to save chapters to disk as they are generated. This provides several benefits:

  1. Data Safety: Even if the process is interrupted, completed chapters are already saved
  2. Progress Tracking: You can monitor progress by watching files appear in the output directory
  3. Immediate Access: Start reviewing early chapters while later ones are still being generated

Example of how files are saved:

import geminiteacher as gt

course = gt.create_course_parallel(
    content="Your content here",
    course_title="Data_Science",
    output_dir="my_courses"
)

# Files will be saved in a structure like:
# my_courses/
#   └── Data_Science/
#       ├── chapter_01_Introduction_to_Data_Science.md
#       ├── chapter_02_Data_Collection_and_Cleaning.md
#       └── chapter_03_Exploratory_Data_Analysis.md

Each chapter file contains the structured content with title, summary, explanation, and extension sections.

API Rate Limits Consideration

When working with external APIs like Google's Gemini, rate limits are an important consideration. The parallel module helps manage these limits through controlled submission timing:

  1. Random Delays: Adds a configurable random delay between API requests to avoid overwhelming the API
  2. Exponential Backoff: When retrying failed requests, uses exponential backoff to gradually increase wait times
  3. Configurable Workers: Allows limiting the number of concurrent processes to respect API parallelism limits

For the Google Gemini API, the following settings work well for most scenarios:

  • max_workers: 2-6 (depending on your API tier)
  • delay_range: (0.2, 1.0) seconds
  • max_retries: 3

These settings balance speed with API reliability. For higher API tiers with more generous rate limits, you can increase max_workers and decrease the delay range.

Error Handling

The parallel module implements comprehensive error handling:

  1. Retries for Empty Responses: Automatically retries when the API returns empty content
  2. Exception Recovery: Catches and handles API errors with automatic retries
  3. Fallback Content: If all retries fail, returns a structured error message instead of failing completely

This ensures robustness even when dealing with unreliable network conditions or API instability.

API Reference

Core Functions

Generate multiple chapters in parallel with retry logic and rate limiting.

This function orchestrates the parallel generation of multiple chapters, handling API rate limits and retrying failed requests. Each chapter is saved to disk as soon as it's generated.

Parameters

chapter_titles : List[str] List of chapter titles to generate. content : str The raw content to use for generating chapters. llm : Optional[BaseLanguageModel], optional The language model to use. If None, a default model will be configured. temperature : float, optional The temperature setting for generation. Default is 0.0. custom_prompt : Optional[str], optional Custom instructions to append to the chapter generation prompt. max_workers : Optional[int], optional Maximum number of worker processes. If None, uses the default. delay_range : tuple, optional Range (min, max) in seconds for the random delay between task submissions. Default is (0.1, 0.5). max_retries : int, optional Maximum number of retry attempts per chapter. Default is 3. course_title : str, optional Title of the course for saving files. Default is "course". output_dir : str, optional Directory to save the chapter files. Default is "output".

Returns

List[ChapterContent] List of generated chapter contents in the same order as the input titles.

Source code in src\geminiteacher\parallel.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def parallel_generate_chapters(
    chapter_titles: List[str],
    content: str,
    llm: Optional[BaseLanguageModel] = None,
    temperature: float = 0.0,
    custom_prompt: Optional[str] = None,
    max_workers: Optional[int] = None,
    delay_range: tuple = (0.1, 0.5),
    max_retries: int = 3,
    course_title: str = "course",
    output_dir: str = "output"
) -> List[ChapterContent]:
    """
    Generate multiple chapters in parallel with retry logic and rate limiting.

    This function orchestrates the parallel generation of multiple chapters,
    handling API rate limits and retrying failed requests. Each chapter is
    saved to disk as soon as it's generated.

    Parameters
    ----------
    chapter_titles : List[str]
        List of chapter titles to generate.
    content : str
        The raw content to use for generating chapters.
    llm : Optional[BaseLanguageModel], optional
        The language model to use. If None, a default model will be configured.
    temperature : float, optional
        The temperature setting for generation. Default is 0.0.
    custom_prompt : Optional[str], optional
        Custom instructions to append to the chapter generation prompt.
    max_workers : Optional[int], optional
        Maximum number of worker processes. If None, uses the default.
    delay_range : tuple, optional
        Range (min, max) in seconds for the random delay between task submissions.
        Default is (0.1, 0.5).
    max_retries : int, optional
        Maximum number of retry attempts per chapter. Default is 3.
    course_title : str, optional
        Title of the course for saving files. Default is "course".
    output_dir : str, optional
        Directory to save the chapter files. Default is "output".

    Returns
    -------
    List[ChapterContent]
        List of generated chapter contents in the same order as the input titles.
    """
    logger = logging.getLogger("geminiteacher.parallel")
    logger.info(f"Starting parallel generation of {len(chapter_titles)} chapters with {max_workers or multiprocessing.cpu_count()} workers")

    # Get API key from the LLM if provided or environment
    api_key = None
    model_name = "gemini-1.5-pro"

    if llm is not None:
        # Try to extract API key and model name from the provided LLM
        try:
            # This assumes LLM is a ChatGoogleGenerativeAI instance
            api_key = getattr(llm, "google_api_key", None)
            model_name = getattr(llm, "model", model_name)
            logger.info(f"Using model: {model_name}")
        except Exception:
            logger.warning("Could not extract API key from provided LLM, will use environment variables")

    # Create a list of (index, chapter_title) tuples to preserve order
    indexed_titles = list(enumerate(chapter_titles))

    logger.info(f"Using delay range: {delay_range[0]}-{delay_range[1]}s between tasks")
    logger.info(f"Saving chapters progressively to {output_dir}/{course_title}/")

    # Generate chapters in parallel with delay between submissions and save each one as it completes
    results = parallel_map_with_delay(
        _worker_generate_and_save_chapter,
        indexed_titles,
        max_workers=max_workers,
        delay_range=delay_range,
        content=content,
        course_title=course_title,
        output_dir=output_dir,
        api_key=api_key,
        model_name=model_name,
        temperature=temperature,
        custom_prompt=custom_prompt,
        max_retries=max_retries,
        retry_delay=1.0
    )

    # Extract just the chapter content from the results (idx, chapter, file_path)
    chapters = [result[1] for result in results]

    logger.info(f"Completed parallel generation of {len(chapters)} chapters")
    return chapters 

Generate a chapter with retry logic for handling API failures.

This function wraps the generate_chapter function with retry logic to handle transient API errors, timeouts, or empty responses.

Parameters

chapter_title : str The title of the chapter to generate. content : str The raw content to use for generating the chapter. llm : Optional[BaseLanguageModel], optional The language model to use. If None, a default model will be configured. temperature : float, optional The temperature setting for generation. Default is 0.0. custom_prompt : Optional[str], optional Custom instructions to append to the chapter generation prompt. max_retries : int, optional Maximum number of retry attempts. Default is 3. retry_delay : float, optional Base delay between retries in seconds. Default is 1.0.

Returns

ChapterContent The generated chapter content.

Notes

This function implements an exponential backoff strategy for retries, with each retry attempt waiting longer than the previous one.

Source code in src\geminiteacher\parallel.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def generate_chapter_with_retry(
    chapter_title: str, 
    content: str, 
    llm: Optional[BaseLanguageModel] = None,
    temperature: float = 0.0,
    custom_prompt: Optional[str] = None,
    max_retries: int = 3,
    retry_delay: float = 1.0
) -> ChapterContent:
    """
    Generate a chapter with retry logic for handling API failures.

    This function wraps the generate_chapter function with retry logic to handle
    transient API errors, timeouts, or empty responses.

    Parameters
    ----------
    chapter_title : str
        The title of the chapter to generate.
    content : str
        The raw content to use for generating the chapter.
    llm : Optional[BaseLanguageModel], optional
        The language model to use. If None, a default model will be configured.
    temperature : float, optional
        The temperature setting for generation. Default is 0.0.
    custom_prompt : Optional[str], optional
        Custom instructions to append to the chapter generation prompt.
    max_retries : int, optional
        Maximum number of retry attempts. Default is 3.
    retry_delay : float, optional
        Base delay between retries in seconds. Default is 1.0.

    Returns
    -------
    ChapterContent
        The generated chapter content.

    Notes
    -----
    This function implements an exponential backoff strategy for retries,
    with each retry attempt waiting longer than the previous one.
    """
    logger = logging.getLogger("geminiteacher.parallel")

    for attempt in range(max_retries + 1):
        try:
            logger.info(f"Generating chapter '{chapter_title}' (attempt {attempt + 1}/{max_retries + 1})")
            chapter = generate_chapter(
                chapter_title=chapter_title,
                content=content,
                llm=llm,
                temperature=temperature,
                custom_prompt=custom_prompt
            )

            # Check if we got a valid response (non-empty explanation)
            if chapter.explanation.strip():
                logger.info(f"Successfully generated chapter '{chapter_title}' (length: {len(chapter.explanation)} chars)")
                return chapter
            else:
                raise ValueError("Empty chapter explanation received")

        except Exception as e:
            if attempt < max_retries:
                # Calculate backoff with jitter
                backoff = retry_delay * (2 ** attempt) + random.uniform(0, 1)
                logger.warning(
                    f"Chapter generation failed for '{chapter_title}' "
                    f"(attempt {attempt + 1}/{max_retries + 1}): {str(e)}. "
                    f"Retrying in {backoff:.2f}s..."
                )
                time.sleep(backoff)
            else:
                logger.error(
                    f"All retry attempts failed for chapter '{chapter_title}'. "
                    f"Last error: {str(e)}"
                )
                # Return a basic chapter with error information
                return ChapterContent(
                    title=chapter_title,
                    summary="Error: Failed to generate chapter content after multiple attempts.",
                    explanation=f"The chapter generation process encountered repeated errors: {str(e)}",
                    extension="Please try regenerating this chapter or check your API configuration."
                )

Execute a function on multiple items in parallel with a delay between submissions.

This function uses ProcessPoolExecutor to parallelize the execution of a function across multiple items, while introducing a random delay between task submissions to avoid overwhelming external APIs with simultaneous requests.

Parameters

func : Callable[..., T] The function to execute in parallel. items : List[Any] The list of items to process. max_workers : Optional[int], optional Maximum number of worker processes. If None, uses the default (typically the number of CPU cores). delay_range : tuple, optional Range (min, max) in seconds for the random delay between task submissions. Default is (0.1, 0.5). **kwargs Additional keyword arguments to pass to the function.

Returns

List[T] List of results in the same order as the input items.

Examples

def process_item(item, factor=1): ... return item * factor items = [1, 2, 3, 4, 5] results = parallel_map_with_delay(process_item, items, factor=2) print(results) [2, 4, 6, 8, 10]

Source code in src\geminiteacher\parallel.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def parallel_map_with_delay(
    func: Callable[..., T],
    items: List[Any],
    max_workers: Optional[int] = None,
    delay_range: tuple = (0.1, 0.5),
    **kwargs
) -> List[T]:
    """
    Execute a function on multiple items in parallel with a delay between submissions.

    This function uses ProcessPoolExecutor to parallelize the execution of a function
    across multiple items, while introducing a random delay between task submissions
    to avoid overwhelming external APIs with simultaneous requests.

    Parameters
    ----------
    func : Callable[..., T]
        The function to execute in parallel.
    items : List[Any]
        The list of items to process.
    max_workers : Optional[int], optional
        Maximum number of worker processes. If None, uses the default
        (typically the number of CPU cores).
    delay_range : tuple, optional
        Range (min, max) in seconds for the random delay between task submissions.
        Default is (0.1, 0.5).
    **kwargs
        Additional keyword arguments to pass to the function.

    Returns
    -------
    List[T]
        List of results in the same order as the input items.

    Examples
    --------
    >>> def process_item(item, factor=1):
    ...     return item * factor
    >>> items = [1, 2, 3, 4, 5]
    >>> results = parallel_map_with_delay(process_item, items, factor=2)
    >>> print(results)
    [2, 4, 6, 8, 10]
    """
    logger = logging.getLogger("geminiteacher.parallel")
    results = []
    total_items = len(items)

    # Export the current log level to the environment for worker processes
    os.environ["GeminiTeacher_LOG_LEVEL"] = logger.getEffectiveLevel().__str__()

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks with a delay between submissions
        futures = []
        for i, item in enumerate(items):
            # Add a small random delay to avoid overwhelming the API
            delay = random.uniform(delay_range[0], delay_range[1])
            logger.debug(f"Submitting task {i+1}/{total_items} with delay: {delay:.2f}s")
            time.sleep(delay)

            # Submit the task to the process pool
            future = executor.submit(func, item, **kwargs)
            futures.append(future)
            logger.info(f"Submitted task {i+1}/{total_items}")

        # Collect results in the original order
        for i, future in enumerate(futures):
            try:
                logger.info(f"Waiting for task {i+1}/{total_items} to complete")
                result = future.result()
                results.append(result)
                logger.info(f"Completed task {i+1}/{total_items}")
            except Exception as e:
                logger.error(f"Task {i+1}/{total_items} failed: {str(e)}")
                # Re-raise the exception to maintain the expected behavior
                raise

    return results

Performance Considerations

When using parallel processing, consider the following to optimize performance:

  1. CPU Cores: The optimal max_workers is typically close to the number of available CPU cores
  2. Memory Usage: Each worker process requires memory, so limit max_workers on memory-constrained systems
  3. API Rate Limits: Always respect API rate limits by adjusting delay_range and max_workers
  4. Task Granularity: Parallel processing works best when individual tasks take significant time

Integration with Course Generator

The parallel module integrates seamlessly with the coursemaker module through the create_course_parallel function:

import geminiteacher as gt

# Generate a course using parallel processing
course = gt.create_course_parallel(
    "Your raw content here",
    max_workers=4,
    delay_range=(0.2, 0.8),
    max_retries=3,
    course_title="Advanced_Topics",
    output_dir="output/courses"
)

print(f"Generated {len(course.chapters)} chapters in parallel")

Limitations

  • Increased memory usage compared to sequential processing
  • Potential for higher API costs due to faster request rates
  • Debugging can be more complex in parallel environments

Future Enhancements

Future versions may include: - Adaptive rate limiting based on API response times - Better telemetry for monitoring API usage - Support for concurrent.futures.ThreadPoolExecutor for I/O-bound tasks - Dynamic worker allocation based on system resources