Python for GenAI Engineers — Async, Pydantic & LLM Production Patterns
1. Introduction and Motivation
Section titled “1. Introduction and Motivation”Python for GenAI engineering is not the same as Python for data science, web development, or general scripting. The requirements are distinct. Knowing the language is not sufficient. You need to know how to apply specific patterns that address the unique challenges of building production LLM systems.
The core problem: most Python tutorials teach synchronous, single-file scripts. Most GenAI tutorials extend this pattern to call OpenAI’s API. But production LLM applications are I/O-bound, concurrent, distributed, and probabilistic. They call multiple APIs simultaneously, handle streaming responses, validate unpredictable LLM outputs, and fail in ways that standard exception handling does not anticipate.
This guide focuses on the Python skills that separate tutorial-level GenAI code from production-ready code. It addresses what changes when you move from a notebook to a production API, and what patterns senior engineers use that junior engineers often miss.
2. Real-World Problem Context
Section titled “2. Real-World Problem Context”The Gap Between Tutorial Code and Production Code
Section titled “The Gap Between Tutorial Code and Production Code”A typical LangChain tutorial:
# Tutorial pattern — synchronous, no error handling, no typesfrom langchain_openai import ChatOpenAIllm = ChatOpenAI()response = llm.invoke("What is RAG?")print(response.content)A production pattern for the same operation:
# Production pattern — async, typed, error handling, observabilityimport asyncioimport loggingfrom typing import Optionalfrom openai import AsyncOpenAI, RateLimitError, APITimeoutErrorfrom pydantic import BaseModel
logger = logging.getLogger(__name__)
class LLMResponse(BaseModel): content: str model: str prompt_tokens: int completion_tokens: int
async def query_llm( prompt: str, model: str = "gpt-4o-mini", max_retries: int = 3, timeout_seconds: float = 30.0) -> Optional[LLMResponse]: client = AsyncOpenAI()
for attempt in range(max_retries): try: response = await asyncio.wait_for( client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=0 ), timeout=timeout_seconds ) return LLMResponse( content=response.choices[0].message.content, model=response.model, prompt_tokens=response.usage.prompt_tokens, completion_tokens=response.usage.completion_tokens ) except RateLimitError: wait_time = 2 ** attempt logger.warning(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1})") await asyncio.sleep(wait_time) except APITimeoutError: logger.error(f"LLM timeout on attempt {attempt + 1}") if attempt == max_retries - 1: raise return NoneThe difference is not stylistic. The tutorial code is unsuitable for production because:
- It blocks the event loop during the API call
- It has no retry logic for rate limits (which happen regularly at scale)
- It has no timeout (the API can hang indefinitely)
- It returns unstructured data that requires error-prone string manipulation
- It logs nothing, making debugging in production impossible
Each of these failures corresponds to a real category of Python knowledge: async programming, retry patterns, type safety, and observability.
Why GenAI Systems Are Different
Section titled “Why GenAI Systems Are Different”They are I/O-bound, not CPU-bound. The bottleneck in a RAG pipeline is almost never CPU computation. It is waiting for API responses: embedding calls, LLM generation, vector database queries. Python’s async/await model is designed for exactly this pattern. Using synchronous code forfeits the performance available from concurrency.
They interact with probabilistic systems. Standard Python exception handling assumes that functions either succeed or raise an exception. LLM responses can succeed (no exception raised) but contain malformed JSON, fail to follow format instructions, or produce outputs that violate your schema. Standard try/except blocks do not catch these failures. Pydantic validation does.
They involve expensive operations. A single GPT-4o call can cost $0.03–0.10 depending on context length. A bug that causes the same prompt to be sent 100 times does not raise an exception — it silently costs money. Cost awareness requires explicit token counting and monitoring.
They operate under rate limits. Every LLM provider imposes rate limits. Production applications must implement retry logic with backoff. This is not optional.
3. Core Concepts and Mental Model
Section titled “3. Core Concepts and Mental Model”The I/O-Bound Mental Model
Section titled “The I/O-Bound Mental Model”The central mental model for Python in GenAI is the event loop. Python’s asyncio event loop is a single-threaded scheduler that manages concurrent I/O operations without the overhead of threads.
Event Loop (single thread):
Time → │──LLM Call 1──│──────waiting──────│──LLM Call 1 response──│ │──LLM Call 2──────────────────────────────│──LLM Call 2 resp──│ │──DB Query──────────────│──DB resp──│ │──Embed──────│──Embed resp──│Without async: each operation blocks the thread, total time = sum of all waits.
With async: while waiting for one operation’s response, the event loop executes other operations. Total time ≈ longest single operation, not the sum.
For a RAG pipeline that calls an embedding API (100ms), a vector database (50ms), and an LLM (500ms), the difference is:
- Synchronous: 650ms sequential
- Async (parallel embedding + retrieval, then LLM): ~550ms (embedding and retrieval in parallel)
The gains are more dramatic when processing batches or handling multiple concurrent requests.
The Type Safety Mental Model
Section titled “The Type Safety Mental Model”Python is dynamically typed. In production, this creates ambiguity about what functions accept and return, especially when dealing with LLM outputs that are inherently unstructured.
The mental model: treat type hints not just as documentation, but as contracts enforced by Pydantic validation at runtime. An LLM output validated through a Pydantic model fails loudly (raises ValidationError) when the LLM returns malformed output, instead of failing silently when you try to access a field that does not exist.
# Without type safety:response = llm.invoke(prompt)data = json.loads(response.content)name = data["name"] # KeyError if LLM returns wrong format — discovered at runtime
# With type safety:from pydantic import BaseModel
class UserInfo(BaseModel): name: str email: str role: str
response = llm.with_structured_output(UserInfo).invoke(prompt)# ValidationError raised immediately if LLM output doesn't match schemaThe Production-First Mental Model
Section titled “The Production-First Mental Model”Every piece of code you write for a GenAI system should answer: “What happens when this fails?”
LLM APIs fail. Rate limits are hit. Responses are malformed. Embeddings timeout. Vector databases refuse connections. The question is not whether failures will happen, but how your code handles them.
A production-first mental model treats failures as first-class concerns, not edge cases.
4. Step-by-Step Explanation
Section titled “4. Step-by-Step Explanation”Skill 1: Async/Await Programming
Section titled “Skill 1: Async/Await Programming”Async programming in Python requires understanding three things: coroutines, the event loop, and how to compose concurrent operations.
Coroutines are not threads. async def creates a coroutine function that returns a coroutine object when called. It does not start execution. Execution only begins when the coroutine is awaited in the context of an event loop.
import asynciofrom openai import AsyncOpenAI
client = AsyncOpenAI()
# A coroutine functionasync def embed_text(text: str) -> list[float]: response = await client.embeddings.create( model="text-embedding-3-small", input=text ) return response.data[0].embedding
# Running coroutines concurrently — the key to performanceasync def embed_many(texts: list[str]) -> list[list[float]]: # asyncio.gather runs all coroutines concurrently # Total time ≈ slowest single call, not sum of all calls embeddings = await asyncio.gather(*[embed_text(t) for t in texts]) return list(embeddings)
# Controlling concurrency to avoid rate limitsasync def embed_with_limit( texts: list[str], max_concurrent: int = 10) -> list[list[float]]: semaphore = asyncio.Semaphore(max_concurrent)
async def embed_with_semaphore(text: str) -> list[float]: async with semaphore: return await embed_text(text)
return await asyncio.gather(*[embed_with_semaphore(t) for t in texts])Common mistake: mixing sync and async code. Calling a synchronous blocking function inside an async context blocks the entire event loop, defeating the purpose of async.
# Bad: blocks the event loopasync def bad_rag(query: str) -> str: embedding = embed_text_sync(query) # Blocks the loop results = vector_db.search_sync(embedding) # Blocks the loop return generate_response_sync(results)
# Good: everything is asyncasync def good_rag(query: str) -> str: embedding = await embed_text(query) results = await vector_db.search(embedding) return await generate_response(results)If you must call synchronous code from async context:
import asynciofrom concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
async def run_sync_in_thread(sync_func, *args): loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, sync_func, *args)Skill 2: Pydantic for Structured LLM Output
Section titled “Skill 2: Pydantic for Structured LLM Output”Pydantic is the most important library for production GenAI code. LLMs are instructed to return structured data (JSON), but they do not always comply. Pydantic validates the output and raises clear errors when validation fails.
from pydantic import BaseModel, Field, field_validatorfrom typing import List, Optionalfrom enum import Enum
class Sentiment(str, Enum): positive = "positive" negative = "negative" neutral = "neutral"
class DocumentAnalysis(BaseModel): summary: str = Field(description="2-3 sentence summary") sentiment: Sentiment key_topics: List[str] = Field(min_length=1, max_length=10) confidence_score: float = Field(ge=0.0, le=1.0) requires_escalation: bool
@field_validator("summary") @classmethod def summary_not_empty(cls, v: str) -> str: if not v.strip(): raise ValueError("Summary cannot be empty") return v.strip()
# Using with OpenAI structured outputfrom openai import OpenAI
client = OpenAI()
def analyze_document(text: str) -> DocumentAnalysis: response = client.beta.chat.completions.parse( model="gpt-4o-mini", messages=[ {"role": "system", "content": "Analyze the document and return structured data."}, {"role": "user", "content": text} ], response_format=DocumentAnalysis, # Pydantic model enforces the schema ) # This raises ValidationError if LLM output doesn't match the schema return response.choices[0].message.parsedUsing Pydantic with LangChain:
from langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini")structured_llm = llm.with_structured_output(DocumentAnalysis)
prompt = ChatPromptTemplate.from_template("Analyze this document: {text}")chain = prompt | structured_llm
result: DocumentAnalysis = chain.invoke({"text": document_text})# result is a validated DocumentAnalysis instanceprint(result.sentiment) # Type-safe accessSkill 3: Retry Logic with Exponential Backoff
Section titled “Skill 3: Retry Logic with Exponential Backoff”Rate limit errors (HTTP 429) are a fact of life with LLM APIs. They occur when you exceed tokens-per-minute or requests-per-minute limits. Every production LLM client needs retry logic.
import asyncioimport randomimport loggingfrom functools import wrapsfrom typing import TypeVar, Callable, Anyfrom openai import RateLimitError, APITimeoutError, APIConnectionError
T = TypeVar("T")logger = logging.getLogger(__name__)
def retry_with_backoff( max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, jitter: bool = True, retryable: tuple = (RateLimitError, APITimeoutError, APIConnectionError)): """ Decorator for async functions that retries on transient failures.
Jitter prevents the thundering herd problem: without jitter, all clients hit the rate limit simultaneously, then all retry simultaneously, causing the limit to be exceeded again. """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) async def wrapper(*args, **kwargs): last_exc = None for attempt in range(max_retries + 1): try: return await func(*args, **kwargs) except retryable as e: last_exc = e if attempt == max_retries: logger.error( f"{func.__name__} failed after {max_retries + 1} attempts: {e}" ) raise
delay = min(base_delay * (2 ** attempt), max_delay) if jitter: delay *= (0.5 + random.random())
logger.warning( f"{func.__name__} attempt {attempt + 1} failed ({type(e).__name__}), " f"retrying in {delay:.1f}s" ) await asyncio.sleep(delay) return wrapper return decorator
# Usage@retry_with_backoff(max_retries=3, base_delay=1.0)async def generate(prompt: str) -> str: response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.contentSkill 4: Generator Functions for Streaming
Section titled “Skill 4: Generator Functions for Streaming”LLM streaming sends tokens to the user as they are generated, improving perceived latency even when total generation time is the same. Python generators are the natural abstraction.
from typing import AsyncIteratorfrom openai import AsyncOpenAIfrom fastapi import FastAPIfrom fastapi.responses import StreamingResponse
client = AsyncOpenAI()app = FastAPI()
async def stream_tokens(prompt: str) -> AsyncIterator[str]: """Stream LLM tokens as they arrive.""" stream = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], stream=True )
async for chunk in stream: delta = chunk.choices[0].delta if delta.content: yield delta.content
@app.get("/stream")async def stream_endpoint(question: str) -> StreamingResponse: async def generate(): async for token in stream_tokens(question): # Server-Sent Events format yield f"data: {token}\n\n" yield "data: [DONE]\n\n"
return StreamingResponse( generate(), media_type="text/event-stream" )Skill 5: Context Managers for Resource Management
Section titled “Skill 5: Context Managers for Resource Management”GenAI applications use multiple clients (OpenAI, Pinecone, database connections). Context managers ensure these resources are properly initialized and cleaned up.
from contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom pinecone import Pineconefrom openai import AsyncOpenAI
# Global resourcesresources = {}
@asynccontextmanagerasync def lifespan(app: FastAPI): """Initialize resources on startup, clean up on shutdown.""" # Startup resources["openai"] = AsyncOpenAI() pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"]) resources["vector_index"] = pc.Index("production")
yield # Application runs here
# Shutdown await resources["openai"].close() # Pinecone client does not need explicit cleanup
app = FastAPI(lifespan=lifespan)
# Access resources in route handlers@app.post("/query")async def query(request: QueryRequest): client = resources["openai"] index = resources["vector_index"] # ... use resources5. Architecture and System View
Section titled “5. Architecture and System View”How Python Components Connect in a RAG System
Section titled “How Python Components Connect in a RAG System”FastAPI Application │ ▼┌────────────────────────────────────────────┐│ Request Handler ││ (validates input with Pydantic) │└──────────────────┬─────────────────────────┘ │ async ┌──────────┴──────────┐ ▼ ▼┌──────────────┐ ┌──────────────────────┐│ Embedding │ │ Cache Check ││ Service │ │ (Redis async client)││ (async │ └──────────┬───────────┘│ OpenAI SDK) │ │ cache miss└──────┬───────┘ │ │ vector ▼ └──────────┐ ┌──────────────────────┐ ▼ │ Vector DB │ ┌──────────│ (async Pinecone/ │ │ asyncio │ Weaviate client) │ │.gather └──────────┬───────────┘ └──────────┘ │ chunks │ merged results ▼ ┌──────────────────────┐ │ LLM Generation │ │ (async OpenAI SDK) │ │ + streaming │ └──────────┬───────────┘ │ response ▼ ┌──────────────────────┐ │ Output Validation │ │ (Pydantic model) │ └──────────────────────┘📊 Visual Explanation
Section titled “📊 Visual Explanation”Sync vs Async RAG Pipeline Timing — both pipelines do the same work; async saves latency by running embedding and cache check in parallel via asyncio.gather.
Sync vs Async RAG Pipeline Timing
Same operations — async.gather runs embed + cache in parallel, saving ~15% latency
6. Practical Examples
Section titled “6. Practical Examples”Pattern 1: The RAG Query Handler (Production-Grade)
Section titled “Pattern 1: The RAG Query Handler (Production-Grade)”import asyncioimport loggingfrom typing import Optional, Listfrom pydantic import BaseModelfrom openai import AsyncOpenAI, RateLimitError, APITimeoutError
logger = logging.getLogger(__name__)
class QueryRequest(BaseModel): question: str top_k: int = 5 temperature: float = 0.0
class QueryResponse(BaseModel): answer: str sources: List[str] tokens_used: int
class RAGService: def __init__( self, openai_client: AsyncOpenAI, vector_index, embedding_model: str = "text-embedding-3-small", generation_model: str = "gpt-4o-mini" ): self.openai = openai_client self.index = vector_index self.embed_model = embedding_model self.gen_model = generation_model
async def _embed(self, text: str) -> list[float]: response = await self.openai.embeddings.create( model=self.embed_model, input=text ) return response.data[0].embedding
async def _retrieve( self, embedding: list[float], top_k: int, filter_dict: Optional[dict] = None ) -> list[dict]: results = self.index.query( vector=embedding, top_k=top_k, filter=filter_dict, include_metadata=True ) return [ {"text": r.metadata.get("text", ""), "source": r.metadata.get("source", "")} for r in results.matches ]
async def _generate( self, question: str, context: list[dict], temperature: float ) -> tuple[str, int]: context_text = "\n\n".join( f"Source: {doc['source']}\n{doc['text']}" for doc in context )
messages = [ { "role": "system", "content": ( "Answer the question using only the provided context. " "Cite sources. If the answer is not in the context, say so." ) }, { "role": "user", "content": f"Context:\n{context_text}\n\nQuestion: {question}" } ]
response = await self.openai.chat.completions.create( model=self.gen_model, messages=messages, temperature=temperature )
return ( response.choices[0].message.content, response.usage.total_tokens )
@retry_with_backoff(max_retries=3, base_delay=1.0) async def query(self, request: QueryRequest) -> QueryResponse: """Execute a RAG query with proper error handling.""" # Embed and retrieve concurrently where possible embedding = await self._embed(request.question) chunks = await self._retrieve(embedding, request.top_k)
if not chunks: return QueryResponse( answer="No relevant information found in the knowledge base.", sources=[], tokens_used=0 )
answer, tokens = await self._generate( request.question, chunks, request.temperature )
sources = list({doc["source"] for doc in chunks if doc["source"]})
return QueryResponse(answer=answer, sources=sources, tokens_used=tokens)Pattern 2: Batch Processing with Semaphore
Section titled “Pattern 2: Batch Processing with Semaphore”import asynciofrom typing import List, TypeVar, Callable, AsyncIterator
T = TypeVar("T")R = TypeVar("R")
async def process_batch( items: List[T], process_func: Callable[[T], R], max_concurrent: int = 10, delay_between_batches: float = 0.1) -> AsyncIterator[R]: """ Process a list of items with bounded concurrency. Used for bulk embedding, batch document processing, etc. """ semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(item: T) -> R: async with semaphore: return await process_func(item)
# Process in chunks to avoid overwhelming the event loop chunk_size = max_concurrent * 5 for i in range(0, len(items), chunk_size): chunk = items[i:i + chunk_size] results = await asyncio.gather( *[process_with_semaphore(item) for item in chunk], return_exceptions=True )
for result in results: if isinstance(result, Exception): logger.error(f"Batch item failed: {result}") else: yield result
if i + chunk_size < len(items): await asyncio.sleep(delay_between_batches)
# Usage: embed 10,000 documents with max 10 concurrent API callsasync def embed_corpus(documents: List[str]) -> List[list[float]]: async def embed_one(text: str) -> list[float]: response = await openai_client.embeddings.create( model="text-embedding-3-small", input=text ) return response.data[0].embedding
embeddings = [] async for embedding in process_batch(documents, embed_one, max_concurrent=10): embeddings.append(embedding) return embeddingsPattern 3: Token Counting for Cost Control
Section titled “Pattern 3: Token Counting for Cost Control”import tiktokenfrom dataclasses import dataclass, fieldfrom threading import Lock
@dataclassclass TokenUsageTracker: model: str prompt_tokens: int = 0 completion_tokens: int = 0 _lock: Lock = field(default_factory=Lock, repr=False)
# Approximate pricing per 1M tokens (verify current pricing) PRICING = { "gpt-4o": {"prompt": 2.50, "completion": 10.00}, "gpt-4o-mini": {"prompt": 0.15, "completion": 0.60}, "gpt-3.5-turbo": {"prompt": 0.50, "completion": 1.50}, }
def add_usage(self, prompt: int, completion: int) -> None: with self._lock: self.prompt_tokens += prompt self.completion_tokens += completion
@property def estimated_cost_usd(self) -> float: pricing = self.PRICING.get(self.model, {"prompt": 0, "completion": 0}) return ( self.prompt_tokens * pricing["prompt"] / 1_000_000 + self.completion_tokens * pricing["completion"] / 1_000_000 )
def count_tokens(text: str, model: str = "gpt-4o-mini") -> int: """Count tokens before making an API call — catches context overflow early.""" encoding_map = { "gpt-4o": "o200k_base", "gpt-4o-mini": "o200k_base", "gpt-3.5-turbo": "cl100k_base", } encoding_name = encoding_map.get(model, "cl100k_base") encoding = tiktoken.get_encoding(encoding_name) return len(encoding.encode(text))
def build_prompt_safely( context_chunks: list[str], question: str, system_prompt: str, model: str = "gpt-4o-mini", max_context_tokens: int = 3000) -> list[dict]: """Build a prompt that respects the context budget.""" included_chunks = [] current_tokens = count_tokens(system_prompt + question, model)
for chunk in context_chunks: chunk_tokens = count_tokens(chunk, model) if current_tokens + chunk_tokens > max_context_tokens: break included_chunks.append(chunk) current_tokens += chunk_tokens
context = "\n\n".join(included_chunks)
return [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"} ]7. Trade-offs, Limitations, and Failure Modes
Section titled “7. Trade-offs, Limitations, and Failure Modes”Failure Mode 1: The Sync Trap in Async Code
Section titled “Failure Mode 1: The Sync Trap in Async Code”Symptom: Your async RAG service handles requests correctly but response times are worse than a synchronous implementation.
Root Cause: A synchronous database call or CPU-intensive operation inside an async handler blocks the event loop, preventing other coroutines from running.
How to detect:
import asyncioimport time
# This blocks the event loop — all other requests waitasync def bad_handler(query: str) -> str: embedding = create_embedding_sync(query) # BLOCKS time.sleep(0.1) # BLOCKS results = db.query_sync(embedding) # BLOCKS return generate_sync(results) # BLOCKSMitigation: Profile with asyncio.get_event_loop().set_debug(True) which logs coroutines that take longer than 100ms. Move CPU-intensive work to run_in_executor.
Failure Mode 2: Pydantic Validation Catching Real LLM Issues
Section titled “Failure Mode 2: Pydantic Validation Catching Real LLM Issues”Symptom: ValidationError exceptions appearing in production logs, more frequently with certain types of queries.
This is actually the system working correctly. The LLM is returning invalid output, and Pydantic is catching it before it causes a downstream data corruption issue. The failure mode to handle:
from pydantic import ValidationError
async def extract_entities(text: str) -> Optional[ExtractedEntities]: try: result = await structured_llm.ainvoke(text) return result except ValidationError as e: # Log the validation error with the raw LLM output for debugging logger.warning( "LLM output validation failed", extra={"validation_errors": e.errors(), "text_length": len(text)} ) # Return None and let the caller decide how to handle return NoneRoot causes of validation failures: Ambiguous prompts, context that confuses the LLM, edge cases the few-shot examples don’t cover. Use these failures as training data for improving prompts.
Failure Mode 3: Memory Leaks from Unclosed Async Clients
Section titled “Failure Mode 3: Memory Leaks from Unclosed Async Clients”Symptom: Memory usage increases steadily. RuntimeWarning: Enable tracemalloc to get the object allocation traceback.
Root Cause: Creating new AsyncOpenAI() or AsyncWeaviate() clients per request without closing them. Each client maintains a connection pool.
# Bad: new client per request (leaks connections)@app.post("/query")async def query_endpoint(request: QueryRequest) -> QueryResponse: client = AsyncOpenAI() # New client, new connection pool result = await rag_service.query(client, request) # Client not closed — connections leaked return result
# Good: single shared client, proper lifecycle management@asynccontextmanagerasync def lifespan(app: FastAPI): app.state.openai_client = AsyncOpenAI() yield await app.state.openai_client.close()
app = FastAPI(lifespan=lifespan)
@app.post("/query")async def query_endpoint(request: QueryRequest, req: Request) -> QueryResponse: client = req.app.state.openai_client return await rag_service.query(client, request)Failure Mode 4: Type Hint False Security
Section titled “Failure Mode 4: Type Hint False Security”Symptom: Type errors at runtime that your type hints should have caught.
Root Cause: Type hints in Python are not enforced at runtime by the interpreter. They are only checked by static type checkers (mypy, Pyright) or runtime validators (Pydantic).
# This runs without error at runtime, despite the type hint violationdef process(text: str) -> list[float]: return "not a list" # No runtime error unless you check
# Pydantic enforces types at runtime:class Config(BaseModel): embedding: list[float] temperature: float
Config(embedding="not a list", temperature=1.0) # ValidationError raisedMitigation: Use Pydantic for any data that crosses a system boundary (LLM output, API request, database response). Use mypy or Pyright in CI for static analysis.
8. Interview Perspective
Section titled “8. Interview Perspective”What Interviewers Test in Python Interviews for GenAI Roles
Section titled “What Interviewers Test in Python Interviews for GenAI Roles”Python questions in GenAI interviews are not generic Python questions. They are production systems questions applied to AI contexts.
Common question categories:
Concurrency: “How would you process 10,000 documents to generate embeddings as quickly as possible while staying within API rate limits?”
Expected answer demonstrates: asyncio.gather for concurrency, asyncio.Semaphore for rate limiting, asyncio.wait_for for timeouts, batch processing to amortize overhead.
Type safety: “How do you ensure that LLM output is structured and validated?”
Expected answer demonstrates: Pydantic models with BaseModel, OpenAI’s response_format with Pydantic, with_structured_output() in LangChain, handling ValidationError gracefully.
Error handling: “Implement a function that calls the OpenAI API with proper retry logic for rate limits.”
Expected answer demonstrates: exponential backoff, jitter, distinguishing retryable errors (429, 503) from non-retryable ones (400 bad request), circuit breaker pattern at scale.
Performance: “Explain how you would reduce the latency of a RAG pipeline from 2 seconds to under 1 second without changing the LLM model.”
Expected answer: async parallel embedding + retrieval, semantic caching for repeated queries, streaming to improve perceived latency, query rewriting to reduce context size.
What signals seniority:
- Discussing
asyncio.Semaphorefor rate limiting without being asked - Mentioning that type hints are not enforced at runtime and how to handle this
- Explaining the difference between
asyncio.gatherexceptions behavior withreturn_exceptions=True - Discussing connection pool management
- Mentioning
tiktokenfor pre-flight token counting to prevent context overflow errors
9. Production Perspective
Section titled “9. Production Perspective”What Production GenAI Code Looks Like vs. Tutorials
Section titled “What Production GenAI Code Looks Like vs. Tutorials”Tutorial code characteristics:
- Synchronous
- No error handling
- Inline configuration (API keys hardcoded or in simple environment variables)
- No logging
- No type hints
- Single file
Production code characteristics:
- Async throughout
- Structured error handling with specific exception types
- Configuration via Pydantic
BaseSettingswith validation - Structured logging (JSON format for log aggregation)
- Comprehensive type hints with Pydantic validation
- Modular structure with separation of concerns
Production configuration pattern:
from pydantic_settings import BaseSettingsfrom pydantic import SecretStr, field_validator
class Settings(BaseSettings): openai_api_key: SecretStr pinecone_api_key: SecretStr pinecone_index_name: str embedding_model: str = "text-embedding-3-small" generation_model: str = "gpt-4o-mini" max_concurrent_requests: int = 20 request_timeout_seconds: float = 30.0 max_retries: int = 3
@field_validator("max_concurrent_requests") @classmethod def validate_concurrency(cls, v: int) -> int: if v < 1 or v > 100: raise ValueError("max_concurrent_requests must be between 1 and 100") return v
class Config: env_file = ".env" env_file_encoding = "utf-8"
# Used throughout the applicationsettings = Settings()Production logging pattern:
import loggingimport jsonfrom datetime import datetime
class JSONFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: log_data = { "timestamp": datetime.utcnow().isoformat(), "level": record.levelname, "message": record.getMessage(), "module": record.module, "function": record.funcName, } if hasattr(record, "extra"): log_data.update(record.extra) if record.exc_info: log_data["exception"] = self.formatException(record.exc_info) return json.dumps(log_data)
# Structured logging that works with Datadog, CloudWatch, Splunklogging.basicConfig(handlers=[logging.StreamHandler()])logging.getLogger().handlers[0].setFormatter(JSONFormatter())Libraries You Should Know Deeply
Section titled “Libraries You Should Know Deeply”| Library | What to know | Depth needed |
|---|---|---|
asyncio | gather, Semaphore, wait_for, event loop | Deep |
pydantic | BaseModel, validators, Settings, ValidationError | Deep |
openai (SDK) | Async client, streaming, structured output, error types | Deep |
tiktoken | Token counting per model, encoding types | Moderate |
fastapi | Async endpoints, Depends, lifespan, Request | Moderate |
langchain_core | Runnable, LCEL, BaseRetriever | Moderate |
httpx | Async HTTP client for non-OpenAI APIs | Basic |
10. Summary and Key Takeaways
Section titled “10. Summary and Key Takeaways”The Five Production Python Skills
Section titled “The Five Production Python Skills”-
Async/await mastery. Use
asyncio.gatherfor parallel I/O,asyncio.Semaphorefor rate limiting, andasyncio.wait_forfor timeouts. Never block the event loop with synchronous code inside async handlers. -
Pydantic for all boundaries. Validate LLM output, API requests, and configuration with Pydantic models. Use
ValidationErroras a signal to improve prompts, not just a failure to suppress. -
Retry with exponential backoff. Implement jitter, distinguish retryable from non-retryable errors, and log all retry attempts for observability. Every LLM API call needs retry logic.
-
Type hints as contracts. Use type hints consistently. Understand that they require Pydantic or mypy to be enforced. Treat unvalidated LLM output as
Anytype until validated through a model. -
Resource lifecycle management. Create async clients once at application startup, share them across requests, and close them on shutdown. Connection leaks are common and difficult to debug.
Self-Evaluation: Production Readiness Checklist
Section titled “Self-Evaluation: Production Readiness Checklist”Before calling your code production-ready, verify:
- All LLM API calls are async
- All async functions have timeout handling
- Rate limit errors trigger exponential backoff with jitter
- LLM outputs are validated with Pydantic before use
- Token counts are checked before API calls to prevent context overflow
- HTTP clients are initialized once and shared, not created per request
- All errors are logged with structured data (exception type, context, retry attempt)
- Configuration is validated at startup with
pydantic_settings
Final Thought
Section titled “Final Thought”The Python skills that matter for GenAI engineering are not academic. They directly affect whether your system handles the Tuesday traffic spike or silently fails, whether it costs $200/month or $2,000/month, and whether a junior engineer can debug a production incident at 2am.
Python mastery for GenAI is not about knowing every language feature. It is about deeply knowing the patterns that production systems require: concurrency, validation, error handling, and observability. These patterns, applied consistently, are the difference between tutorial code and production code.
Related
Section titled “Related”- AI Agents and Agentic Systems — The async patterns in this guide are foundational for building production agent systems
- Essential GenAI Tools — The full production tool stack that these Python patterns connect to
- LangChain vs LangGraph — Apply async/await and Pydantic patterns to both LangChain chains and LangGraph state machines
- Agentic Patterns — Production Python is required to implement ReAct, reflection, and tool use reliably
- AI Coding Environments — How Cursor, Claude Code, and GitHub Copilot assist with writing production-quality Python
Last updated: February 2026. Python recommendations reflect production patterns for LLM application development using Python 3.10+ with modern async patterns.