Skip to content

Python for GenAI Engineers — Async, Pydantic & LLM Production Patterns

Python for GenAI engineering is not the same as Python for data science, web development, or general scripting. The requirements are distinct. Knowing the language is not sufficient. You need to know how to apply specific patterns that address the unique challenges of building production LLM systems.

The core problem: most Python tutorials teach synchronous, single-file scripts. Most GenAI tutorials extend this pattern to call OpenAI’s API. But production LLM applications are I/O-bound, concurrent, distributed, and probabilistic. They call multiple APIs simultaneously, handle streaming responses, validate unpredictable LLM outputs, and fail in ways that standard exception handling does not anticipate.

This guide focuses on the Python skills that separate tutorial-level GenAI code from production-ready code. It addresses what changes when you move from a notebook to a production API, and what patterns senior engineers use that junior engineers often miss.


The Gap Between Tutorial Code and Production Code

Section titled “The Gap Between Tutorial Code and Production Code”

A typical LangChain tutorial:

# Tutorial pattern — synchronous, no error handling, no types
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
response = llm.invoke("What is RAG?")
print(response.content)

A production pattern for the same operation:

# Production pattern — async, typed, error handling, observability
import asyncio
import logging
from typing import Optional
from openai import AsyncOpenAI, RateLimitError, APITimeoutError
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class LLMResponse(BaseModel):
content: str
model: str
prompt_tokens: int
completion_tokens: int
async def query_llm(
prompt: str,
model: str = "gpt-4o-mini",
max_retries: int = 3,
timeout_seconds: float = 30.0
) -> Optional[LLMResponse]:
client = AsyncOpenAI()
for attempt in range(max_retries):
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0
),
timeout=timeout_seconds
)
return LLMResponse(
content=response.choices[0].message.content,
model=response.model,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens
)
except RateLimitError:
wait_time = 2 ** attempt
logger.warning(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1})")
await asyncio.sleep(wait_time)
except APITimeoutError:
logger.error(f"LLM timeout on attempt {attempt + 1}")
if attempt == max_retries - 1:
raise
return None

The difference is not stylistic. The tutorial code is unsuitable for production because:

  1. It blocks the event loop during the API call
  2. It has no retry logic for rate limits (which happen regularly at scale)
  3. It has no timeout (the API can hang indefinitely)
  4. It returns unstructured data that requires error-prone string manipulation
  5. It logs nothing, making debugging in production impossible

Each of these failures corresponds to a real category of Python knowledge: async programming, retry patterns, type safety, and observability.

They are I/O-bound, not CPU-bound. The bottleneck in a RAG pipeline is almost never CPU computation. It is waiting for API responses: embedding calls, LLM generation, vector database queries. Python’s async/await model is designed for exactly this pattern. Using synchronous code forfeits the performance available from concurrency.

They interact with probabilistic systems. Standard Python exception handling assumes that functions either succeed or raise an exception. LLM responses can succeed (no exception raised) but contain malformed JSON, fail to follow format instructions, or produce outputs that violate your schema. Standard try/except blocks do not catch these failures. Pydantic validation does.

They involve expensive operations. A single GPT-4o call can cost $0.03–0.10 depending on context length. A bug that causes the same prompt to be sent 100 times does not raise an exception — it silently costs money. Cost awareness requires explicit token counting and monitoring.

They operate under rate limits. Every LLM provider imposes rate limits. Production applications must implement retry logic with backoff. This is not optional.


The central mental model for Python in GenAI is the event loop. Python’s asyncio event loop is a single-threaded scheduler that manages concurrent I/O operations without the overhead of threads.

Event Loop (single thread):
Time → │──LLM Call 1──│──────waiting──────│──LLM Call 1 response──│
│──LLM Call 2──────────────────────────────│──LLM Call 2 resp──│
│──DB Query──────────────│──DB resp──│
│──Embed──────│──Embed resp──│

Without async: each operation blocks the thread, total time = sum of all waits.

With async: while waiting for one operation’s response, the event loop executes other operations. Total time ≈ longest single operation, not the sum.

For a RAG pipeline that calls an embedding API (100ms), a vector database (50ms), and an LLM (500ms), the difference is:

  • Synchronous: 650ms sequential
  • Async (parallel embedding + retrieval, then LLM): ~550ms (embedding and retrieval in parallel)

The gains are more dramatic when processing batches or handling multiple concurrent requests.

Python is dynamically typed. In production, this creates ambiguity about what functions accept and return, especially when dealing with LLM outputs that are inherently unstructured.

The mental model: treat type hints not just as documentation, but as contracts enforced by Pydantic validation at runtime. An LLM output validated through a Pydantic model fails loudly (raises ValidationError) when the LLM returns malformed output, instead of failing silently when you try to access a field that does not exist.

# Without type safety:
response = llm.invoke(prompt)
data = json.loads(response.content)
name = data["name"] # KeyError if LLM returns wrong format — discovered at runtime
# With type safety:
from pydantic import BaseModel
class UserInfo(BaseModel):
name: str
email: str
role: str
response = llm.with_structured_output(UserInfo).invoke(prompt)
# ValidationError raised immediately if LLM output doesn't match schema

Every piece of code you write for a GenAI system should answer: “What happens when this fails?”

LLM APIs fail. Rate limits are hit. Responses are malformed. Embeddings timeout. Vector databases refuse connections. The question is not whether failures will happen, but how your code handles them.

A production-first mental model treats failures as first-class concerns, not edge cases.


Async programming in Python requires understanding three things: coroutines, the event loop, and how to compose concurrent operations.

Coroutines are not threads. async def creates a coroutine function that returns a coroutine object when called. It does not start execution. Execution only begins when the coroutine is awaited in the context of an event loop.

import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
# A coroutine function
async def embed_text(text: str) -> list[float]:
response = await client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
# Running coroutines concurrently — the key to performance
async def embed_many(texts: list[str]) -> list[list[float]]:
# asyncio.gather runs all coroutines concurrently
# Total time ≈ slowest single call, not sum of all calls
embeddings = await asyncio.gather(*[embed_text(t) for t in texts])
return list(embeddings)
# Controlling concurrency to avoid rate limits
async def embed_with_limit(
texts: list[str],
max_concurrent: int = 10
) -> list[list[float]]:
semaphore = asyncio.Semaphore(max_concurrent)
async def embed_with_semaphore(text: str) -> list[float]:
async with semaphore:
return await embed_text(text)
return await asyncio.gather(*[embed_with_semaphore(t) for t in texts])

Common mistake: mixing sync and async code. Calling a synchronous blocking function inside an async context blocks the entire event loop, defeating the purpose of async.

# Bad: blocks the event loop
async def bad_rag(query: str) -> str:
embedding = embed_text_sync(query) # Blocks the loop
results = vector_db.search_sync(embedding) # Blocks the loop
return generate_response_sync(results)
# Good: everything is async
async def good_rag(query: str) -> str:
embedding = await embed_text(query)
results = await vector_db.search(embedding)
return await generate_response(results)

If you must call synchronous code from async context:

import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
async def run_sync_in_thread(sync_func, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, sync_func, *args)

Skill 2: Pydantic for Structured LLM Output

Section titled “Skill 2: Pydantic for Structured LLM Output”

Pydantic is the most important library for production GenAI code. LLMs are instructed to return structured data (JSON), but they do not always comply. Pydantic validates the output and raises clear errors when validation fails.

from pydantic import BaseModel, Field, field_validator
from typing import List, Optional
from enum import Enum
class Sentiment(str, Enum):
positive = "positive"
negative = "negative"
neutral = "neutral"
class DocumentAnalysis(BaseModel):
summary: str = Field(description="2-3 sentence summary")
sentiment: Sentiment
key_topics: List[str] = Field(min_length=1, max_length=10)
confidence_score: float = Field(ge=0.0, le=1.0)
requires_escalation: bool
@field_validator("summary")
@classmethod
def summary_not_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("Summary cannot be empty")
return v.strip()
# Using with OpenAI structured output
from openai import OpenAI
client = OpenAI()
def analyze_document(text: str) -> DocumentAnalysis:
response = client.beta.chat.completions.parse(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Analyze the document and return structured data."},
{"role": "user", "content": text}
],
response_format=DocumentAnalysis, # Pydantic model enforces the schema
)
# This raises ValidationError if LLM output doesn't match the schema
return response.choices[0].message.parsed

Using Pydantic with LangChain:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini")
structured_llm = llm.with_structured_output(DocumentAnalysis)
prompt = ChatPromptTemplate.from_template("Analyze this document: {text}")
chain = prompt | structured_llm
result: DocumentAnalysis = chain.invoke({"text": document_text})
# result is a validated DocumentAnalysis instance
print(result.sentiment) # Type-safe access

Skill 3: Retry Logic with Exponential Backoff

Section titled “Skill 3: Retry Logic with Exponential Backoff”

Rate limit errors (HTTP 429) are a fact of life with LLM APIs. They occur when you exceed tokens-per-minute or requests-per-minute limits. Every production LLM client needs retry logic.

import asyncio
import random
import logging
from functools import wraps
from typing import TypeVar, Callable, Any
from openai import RateLimitError, APITimeoutError, APIConnectionError
T = TypeVar("T")
logger = logging.getLogger(__name__)
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
retryable: tuple = (RateLimitError, APITimeoutError, APIConnectionError)
):
"""
Decorator for async functions that retries on transient failures.
Jitter prevents the thundering herd problem: without jitter,
all clients hit the rate limit simultaneously, then all retry
simultaneously, causing the limit to be exceeded again.
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except retryable as e:
last_exc = e
if attempt == max_retries:
logger.error(
f"{func.__name__} failed after {max_retries + 1} attempts: {e}"
)
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay *= (0.5 + random.random())
logger.warning(
f"{func.__name__} attempt {attempt + 1} failed ({type(e).__name__}), "
f"retrying in {delay:.1f}s"
)
await asyncio.sleep(delay)
return wrapper
return decorator
# Usage
@retry_with_backoff(max_retries=3, base_delay=1.0)
async def generate(prompt: str) -> str:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content

Skill 4: Generator Functions for Streaming

Section titled “Skill 4: Generator Functions for Streaming”

LLM streaming sends tokens to the user as they are generated, improving perceived latency even when total generation time is the same. Python generators are the natural abstraction.

from typing import AsyncIterator
from openai import AsyncOpenAI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
client = AsyncOpenAI()
app = FastAPI()
async def stream_tokens(prompt: str) -> AsyncIterator[str]:
"""Stream LLM tokens as they arrive."""
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
@app.get("/stream")
async def stream_endpoint(question: str) -> StreamingResponse:
async def generate():
async for token in stream_tokens(question):
# Server-Sent Events format
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)

Skill 5: Context Managers for Resource Management

Section titled “Skill 5: Context Managers for Resource Management”

GenAI applications use multiple clients (OpenAI, Pinecone, database connections). Context managers ensure these resources are properly initialized and cleaned up.

from contextlib import asynccontextmanager
from fastapi import FastAPI
from pinecone import Pinecone
from openai import AsyncOpenAI
# Global resources
resources = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize resources on startup, clean up on shutdown."""
# Startup
resources["openai"] = AsyncOpenAI()
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
resources["vector_index"] = pc.Index("production")
yield # Application runs here
# Shutdown
await resources["openai"].close()
# Pinecone client does not need explicit cleanup
app = FastAPI(lifespan=lifespan)
# Access resources in route handlers
@app.post("/query")
async def query(request: QueryRequest):
client = resources["openai"]
index = resources["vector_index"]
# ... use resources

How Python Components Connect in a RAG System

Section titled “How Python Components Connect in a RAG System”
FastAPI Application
┌────────────────────────────────────────────┐
│ Request Handler │
│ (validates input with Pydantic) │
└──────────────────┬─────────────────────────┘
│ async
┌──────────┴──────────┐
▼ ▼
┌──────────────┐ ┌──────────────────────┐
│ Embedding │ │ Cache Check │
│ Service │ │ (Redis async client)│
│ (async │ └──────────┬───────────┘
│ OpenAI SDK) │ │ cache miss
└──────┬───────┘ │
│ vector ▼
└──────────┐ ┌──────────────────────┐
▼ │ Vector DB │
┌──────────│ (async Pinecone/ │
│ asyncio │ Weaviate client) │
│.gather └──────────┬───────────┘
└──────────┘ │ chunks
│ merged results
┌──────────────────────┐
│ LLM Generation │
│ (async OpenAI SDK) │
│ + streaming │
└──────────┬───────────┘
│ response
┌──────────────────────┐
│ Output Validation │
│ (Pydantic model) │
└──────────────────────┘

Sync vs Async RAG Pipeline Timing — both pipelines do the same work; async saves latency by running embedding and cache check in parallel via asyncio.gather.

Sync vs Async RAG Pipeline Timing

Same operations — async.gather runs embed + cache in parallel, saving ~15% latency

Synchronous~650ms total — each step blocks the event loop
Embed Query
100ms · blocks
Vector Search
50ms · blocks
LLM Generate
500ms · blocks
Response
Async (asyncio.gather)~550ms total — embed + cache run in parallel
Embed + Cache Check
100ms parallel
Vector Search
50ms
LLM Generate
500ms
Response
Idle

Pattern 1: The RAG Query Handler (Production-Grade)

Section titled “Pattern 1: The RAG Query Handler (Production-Grade)”
import asyncio
import logging
from typing import Optional, List
from pydantic import BaseModel
from openai import AsyncOpenAI, RateLimitError, APITimeoutError
logger = logging.getLogger(__name__)
class QueryRequest(BaseModel):
question: str
top_k: int = 5
temperature: float = 0.0
class QueryResponse(BaseModel):
answer: str
sources: List[str]
tokens_used: int
class RAGService:
def __init__(
self,
openai_client: AsyncOpenAI,
vector_index,
embedding_model: str = "text-embedding-3-small",
generation_model: str = "gpt-4o-mini"
):
self.openai = openai_client
self.index = vector_index
self.embed_model = embedding_model
self.gen_model = generation_model
async def _embed(self, text: str) -> list[float]:
response = await self.openai.embeddings.create(
model=self.embed_model,
input=text
)
return response.data[0].embedding
async def _retrieve(
self, embedding: list[float], top_k: int, filter_dict: Optional[dict] = None
) -> list[dict]:
results = self.index.query(
vector=embedding,
top_k=top_k,
filter=filter_dict,
include_metadata=True
)
return [
{"text": r.metadata.get("text", ""), "source": r.metadata.get("source", "")}
for r in results.matches
]
async def _generate(
self, question: str, context: list[dict], temperature: float
) -> tuple[str, int]:
context_text = "\n\n".join(
f"Source: {doc['source']}\n{doc['text']}"
for doc in context
)
messages = [
{
"role": "system",
"content": (
"Answer the question using only the provided context. "
"Cite sources. If the answer is not in the context, say so."
)
},
{
"role": "user",
"content": f"Context:\n{context_text}\n\nQuestion: {question}"
}
]
response = await self.openai.chat.completions.create(
model=self.gen_model,
messages=messages,
temperature=temperature
)
return (
response.choices[0].message.content,
response.usage.total_tokens
)
@retry_with_backoff(max_retries=3, base_delay=1.0)
async def query(self, request: QueryRequest) -> QueryResponse:
"""Execute a RAG query with proper error handling."""
# Embed and retrieve concurrently where possible
embedding = await self._embed(request.question)
chunks = await self._retrieve(embedding, request.top_k)
if not chunks:
return QueryResponse(
answer="No relevant information found in the knowledge base.",
sources=[],
tokens_used=0
)
answer, tokens = await self._generate(
request.question, chunks, request.temperature
)
sources = list({doc["source"] for doc in chunks if doc["source"]})
return QueryResponse(answer=answer, sources=sources, tokens_used=tokens)

Pattern 2: Batch Processing with Semaphore

Section titled “Pattern 2: Batch Processing with Semaphore”
import asyncio
from typing import List, TypeVar, Callable, AsyncIterator
T = TypeVar("T")
R = TypeVar("R")
async def process_batch(
items: List[T],
process_func: Callable[[T], R],
max_concurrent: int = 10,
delay_between_batches: float = 0.1
) -> AsyncIterator[R]:
"""
Process a list of items with bounded concurrency.
Used for bulk embedding, batch document processing, etc.
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(item: T) -> R:
async with semaphore:
return await process_func(item)
# Process in chunks to avoid overwhelming the event loop
chunk_size = max_concurrent * 5
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
results = await asyncio.gather(
*[process_with_semaphore(item) for item in chunk],
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
logger.error(f"Batch item failed: {result}")
else:
yield result
if i + chunk_size < len(items):
await asyncio.sleep(delay_between_batches)
# Usage: embed 10,000 documents with max 10 concurrent API calls
async def embed_corpus(documents: List[str]) -> List[list[float]]:
async def embed_one(text: str) -> list[float]:
response = await openai_client.embeddings.create(
model="text-embedding-3-small", input=text
)
return response.data[0].embedding
embeddings = []
async for embedding in process_batch(documents, embed_one, max_concurrent=10):
embeddings.append(embedding)
return embeddings

Pattern 3: Token Counting for Cost Control

Section titled “Pattern 3: Token Counting for Cost Control”
import tiktoken
from dataclasses import dataclass, field
from threading import Lock
@dataclass
class TokenUsageTracker:
model: str
prompt_tokens: int = 0
completion_tokens: int = 0
_lock: Lock = field(default_factory=Lock, repr=False)
# Approximate pricing per 1M tokens (verify current pricing)
PRICING = {
"gpt-4o": {"prompt": 2.50, "completion": 10.00},
"gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
"gpt-3.5-turbo": {"prompt": 0.50, "completion": 1.50},
}
def add_usage(self, prompt: int, completion: int) -> None:
with self._lock:
self.prompt_tokens += prompt
self.completion_tokens += completion
@property
def estimated_cost_usd(self) -> float:
pricing = self.PRICING.get(self.model, {"prompt": 0, "completion": 0})
return (
self.prompt_tokens * pricing["prompt"] / 1_000_000
+ self.completion_tokens * pricing["completion"] / 1_000_000
)
def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
"""Count tokens before making an API call — catches context overflow early."""
encoding_map = {
"gpt-4o": "o200k_base",
"gpt-4o-mini": "o200k_base",
"gpt-3.5-turbo": "cl100k_base",
}
encoding_name = encoding_map.get(model, "cl100k_base")
encoding = tiktoken.get_encoding(encoding_name)
return len(encoding.encode(text))
def build_prompt_safely(
context_chunks: list[str],
question: str,
system_prompt: str,
model: str = "gpt-4o-mini",
max_context_tokens: int = 3000
) -> list[dict]:
"""Build a prompt that respects the context budget."""
included_chunks = []
current_tokens = count_tokens(system_prompt + question, model)
for chunk in context_chunks:
chunk_tokens = count_tokens(chunk, model)
if current_tokens + chunk_tokens > max_context_tokens:
break
included_chunks.append(chunk)
current_tokens += chunk_tokens
context = "\n\n".join(included_chunks)
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
]

7. Trade-offs, Limitations, and Failure Modes

Section titled “7. Trade-offs, Limitations, and Failure Modes”

Failure Mode 1: The Sync Trap in Async Code

Section titled “Failure Mode 1: The Sync Trap in Async Code”

Symptom: Your async RAG service handles requests correctly but response times are worse than a synchronous implementation.

Root Cause: A synchronous database call or CPU-intensive operation inside an async handler blocks the event loop, preventing other coroutines from running.

How to detect:

import asyncio
import time
# This blocks the event loop — all other requests wait
async def bad_handler(query: str) -> str:
embedding = create_embedding_sync(query) # BLOCKS
time.sleep(0.1) # BLOCKS
results = db.query_sync(embedding) # BLOCKS
return generate_sync(results) # BLOCKS

Mitigation: Profile with asyncio.get_event_loop().set_debug(True) which logs coroutines that take longer than 100ms. Move CPU-intensive work to run_in_executor.

Failure Mode 2: Pydantic Validation Catching Real LLM Issues

Section titled “Failure Mode 2: Pydantic Validation Catching Real LLM Issues”

Symptom: ValidationError exceptions appearing in production logs, more frequently with certain types of queries.

This is actually the system working correctly. The LLM is returning invalid output, and Pydantic is catching it before it causes a downstream data corruption issue. The failure mode to handle:

from pydantic import ValidationError
async def extract_entities(text: str) -> Optional[ExtractedEntities]:
try:
result = await structured_llm.ainvoke(text)
return result
except ValidationError as e:
# Log the validation error with the raw LLM output for debugging
logger.warning(
"LLM output validation failed",
extra={"validation_errors": e.errors(), "text_length": len(text)}
)
# Return None and let the caller decide how to handle
return None

Root causes of validation failures: Ambiguous prompts, context that confuses the LLM, edge cases the few-shot examples don’t cover. Use these failures as training data for improving prompts.

Failure Mode 3: Memory Leaks from Unclosed Async Clients

Section titled “Failure Mode 3: Memory Leaks from Unclosed Async Clients”

Symptom: Memory usage increases steadily. RuntimeWarning: Enable tracemalloc to get the object allocation traceback.

Root Cause: Creating new AsyncOpenAI() or AsyncWeaviate() clients per request without closing them. Each client maintains a connection pool.

# Bad: new client per request (leaks connections)
@app.post("/query")
async def query_endpoint(request: QueryRequest) -> QueryResponse:
client = AsyncOpenAI() # New client, new connection pool
result = await rag_service.query(client, request)
# Client not closed — connections leaked
return result
# Good: single shared client, proper lifecycle management
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.openai_client = AsyncOpenAI()
yield
await app.state.openai_client.close()
app = FastAPI(lifespan=lifespan)
@app.post("/query")
async def query_endpoint(request: QueryRequest, req: Request) -> QueryResponse:
client = req.app.state.openai_client
return await rag_service.query(client, request)

Symptom: Type errors at runtime that your type hints should have caught.

Root Cause: Type hints in Python are not enforced at runtime by the interpreter. They are only checked by static type checkers (mypy, Pyright) or runtime validators (Pydantic).

# This runs without error at runtime, despite the type hint violation
def process(text: str) -> list[float]:
return "not a list" # No runtime error unless you check
# Pydantic enforces types at runtime:
class Config(BaseModel):
embedding: list[float]
temperature: float
Config(embedding="not a list", temperature=1.0) # ValidationError raised

Mitigation: Use Pydantic for any data that crosses a system boundary (LLM output, API request, database response). Use mypy or Pyright in CI for static analysis.


What Interviewers Test in Python Interviews for GenAI Roles

Section titled “What Interviewers Test in Python Interviews for GenAI Roles”

Python questions in GenAI interviews are not generic Python questions. They are production systems questions applied to AI contexts.

Common question categories:

Concurrency: “How would you process 10,000 documents to generate embeddings as quickly as possible while staying within API rate limits?”

Expected answer demonstrates: asyncio.gather for concurrency, asyncio.Semaphore for rate limiting, asyncio.wait_for for timeouts, batch processing to amortize overhead.

Type safety: “How do you ensure that LLM output is structured and validated?”

Expected answer demonstrates: Pydantic models with BaseModel, OpenAI’s response_format with Pydantic, with_structured_output() in LangChain, handling ValidationError gracefully.

Error handling: “Implement a function that calls the OpenAI API with proper retry logic for rate limits.”

Expected answer demonstrates: exponential backoff, jitter, distinguishing retryable errors (429, 503) from non-retryable ones (400 bad request), circuit breaker pattern at scale.

Performance: “Explain how you would reduce the latency of a RAG pipeline from 2 seconds to under 1 second without changing the LLM model.”

Expected answer: async parallel embedding + retrieval, semantic caching for repeated queries, streaming to improve perceived latency, query rewriting to reduce context size.

What signals seniority:

  • Discussing asyncio.Semaphore for rate limiting without being asked
  • Mentioning that type hints are not enforced at runtime and how to handle this
  • Explaining the difference between asyncio.gather exceptions behavior with return_exceptions=True
  • Discussing connection pool management
  • Mentioning tiktoken for pre-flight token counting to prevent context overflow errors

What Production GenAI Code Looks Like vs. Tutorials

Section titled “What Production GenAI Code Looks Like vs. Tutorials”

Tutorial code characteristics:

  • Synchronous
  • No error handling
  • Inline configuration (API keys hardcoded or in simple environment variables)
  • No logging
  • No type hints
  • Single file

Production code characteristics:

  • Async throughout
  • Structured error handling with specific exception types
  • Configuration via Pydantic BaseSettings with validation
  • Structured logging (JSON format for log aggregation)
  • Comprehensive type hints with Pydantic validation
  • Modular structure with separation of concerns

Production configuration pattern:

from pydantic_settings import BaseSettings
from pydantic import SecretStr, field_validator
class Settings(BaseSettings):
openai_api_key: SecretStr
pinecone_api_key: SecretStr
pinecone_index_name: str
embedding_model: str = "text-embedding-3-small"
generation_model: str = "gpt-4o-mini"
max_concurrent_requests: int = 20
request_timeout_seconds: float = 30.0
max_retries: int = 3
@field_validator("max_concurrent_requests")
@classmethod
def validate_concurrency(cls, v: int) -> int:
if v < 1 or v > 100:
raise ValueError("max_concurrent_requests must be between 1 and 100")
return v
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# Used throughout the application
settings = Settings()

Production logging pattern:

import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
if hasattr(record, "extra"):
log_data.update(record.extra)
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
# Structured logging that works with Datadog, CloudWatch, Splunk
logging.basicConfig(handlers=[logging.StreamHandler()])
logging.getLogger().handlers[0].setFormatter(JSONFormatter())
LibraryWhat to knowDepth needed
asynciogather, Semaphore, wait_for, event loopDeep
pydanticBaseModel, validators, Settings, ValidationErrorDeep
openai (SDK)Async client, streaming, structured output, error typesDeep
tiktokenToken counting per model, encoding typesModerate
fastapiAsync endpoints, Depends, lifespan, RequestModerate
langchain_coreRunnable, LCEL, BaseRetrieverModerate
httpxAsync HTTP client for non-OpenAI APIsBasic

  1. Async/await mastery. Use asyncio.gather for parallel I/O, asyncio.Semaphore for rate limiting, and asyncio.wait_for for timeouts. Never block the event loop with synchronous code inside async handlers.

  2. Pydantic for all boundaries. Validate LLM output, API requests, and configuration with Pydantic models. Use ValidationError as a signal to improve prompts, not just a failure to suppress.

  3. Retry with exponential backoff. Implement jitter, distinguish retryable from non-retryable errors, and log all retry attempts for observability. Every LLM API call needs retry logic.

  4. Type hints as contracts. Use type hints consistently. Understand that they require Pydantic or mypy to be enforced. Treat unvalidated LLM output as Any type until validated through a model.

  5. Resource lifecycle management. Create async clients once at application startup, share them across requests, and close them on shutdown. Connection leaks are common and difficult to debug.

Self-Evaluation: Production Readiness Checklist

Section titled “Self-Evaluation: Production Readiness Checklist”

Before calling your code production-ready, verify:

  • All LLM API calls are async
  • All async functions have timeout handling
  • Rate limit errors trigger exponential backoff with jitter
  • LLM outputs are validated with Pydantic before use
  • Token counts are checked before API calls to prevent context overflow
  • HTTP clients are initialized once and shared, not created per request
  • All errors are logged with structured data (exception type, context, retry attempt)
  • Configuration is validated at startup with pydantic_settings

The Python skills that matter for GenAI engineering are not academic. They directly affect whether your system handles the Tuesday traffic spike or silently fails, whether it costs $200/month or $2,000/month, and whether a junior engineer can debug a production incident at 2am.

Python mastery for GenAI is not about knowing every language feature. It is about deeply knowing the patterns that production systems require: concurrency, validation, error handling, and observability. These patterns, applied consistently, are the difference between tutorial code and production code.

  • AI Agents and Agentic Systems — The async patterns in this guide are foundational for building production agent systems
  • Essential GenAI Tools — The full production tool stack that these Python patterns connect to
  • LangChain vs LangGraph — Apply async/await and Pydantic patterns to both LangChain chains and LangGraph state machines
  • Agentic Patterns — Production Python is required to implement ReAct, reflection, and tool use reliably
  • AI Coding Environments — How Cursor, Claude Code, and GitHub Copilot assist with writing production-quality Python

Last updated: February 2026. Python recommendations reflect production patterns for LLM application development using Python 3.10+ with modern async patterns.