Graceful API Call Failure 101 for Data Scientists
How to methodically deal with API call failures in long-running pipelines
Read a better-formatted version on Medium.
First of all, what is unique about handling failures in data science? Isn’t proper failure handling a crucial practice in any programming in general? Well, yes but we see too many data scientists who are oblivious or dismissive of following any software engineering practices in the name of “data science”.
Secondly, general error handling is easy, at least in Python, the lingua franca of data science. Just throw in a try-except block. Failure handling for APIs is more nuanced though, for two reasons:
Here we focus on these aspects using a very relevant and real example — using LLM APIs.
Here is a piece of code we wrote using the google-genai Python SDK.
class GeminiClient:
def generate(self,...):
...
response = self._client.models.generate_content(
model=self._model.value,
contents= attached_files + [prompt],
config=types.GenerateContentConfig(
max_output_tokens=max_tokens,
system_instruction=system_prompt or None,
)
)
...
The whole code can be viewed here. It uses Google’s Gemini API to extract information from PDF files passed as attachments to a prompt. When we run this code on a few thousand files, it generally runs well, except:
When there is no error handling the entire, pipeline containing the code has to be resumed manually. To handle these scenarios we need sophisticated failure handling:
Writing this logic in a try-except block makes the code deeply indented and difficult to follow. Instead, we can use Python’s decorator mechanism to wrap the existing function call in error-handling logic. A wrapped call will then look like this:
class GeminiClient:
@timeout_wrapper(retry_backoffs=[...], when: Callable[[Exception], bool])
@size_exceeded_wrapper(when: Callable[[Exception], bool])
def generate(self, ...):
...
response = self._client.models.generate_content(
model=self._model.value,
contents= attached_files + [prompt],
config=types.GenerateContentConfig(
max_output_tokens=max_tokens,
system_instruction=system_prompt or None,
)
)
...
You may read this primer on decorators before following along on the implementation journey below.
Our two wrappers are gonna look and feel a bit different from each other. Both the wrappers ultimately call the generatemethod. timeout_wrapper takes a list of backoff periods, for which it will wait before retrying, and a filter function to determine if an exception needs retries. size_exceeded_wrapper only takes a filer function.
Recommended by LinkedIn
The timeout wrapper needs to store its parameters somewhere. So it can either be implemented as a class or more easily as a nested function, like this:
def retry_with_backoff(backoffs: List[int], when: Callable[[Exception], bool]):
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
for backoff in backoffs:
try:
return func(self, *args, **kwargs)
except Exception as e:
if not when(e):
raise
time.sleep(backoff)
# Re-raise, if we exhaust all backoffs without success
else:
raise
return wrapper
return decorator
The nested functionsdecorator and wrapper are the standard way of implementing a decorator in Python. The outermost function retry_with_backoff is required to capture the external parameters we want to set.
The size exceeded wrapper can look like this:
def skip_silently(when: Callable[[Exception], bool]):
def decorator(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if not when(e):
raise
else:
return (
"Observations/Remarks:\n\n"
"File too large to be processed.\n\n"
"```markdown\n\n```"
)
return wrapper
return decorator
This is simpler but we may add a strategy to split the oversized file and process each segment in a separate call.
After fleshing out the failure-handling logic, the filter functions are straightforward. For the timeout wrapper, we define _is_retryable as the filter, which again is composed of two more granular filters.
def _is_server_overloaded(e: Exception):
return isinstance(e, ServerError) and e.code == 503 and str(e).find("The model is overloaded") >= 0
def _is_file_io_timeout(e: Exception):
return isinstance(e, ReadTimeout)
def _is_retryable(e: Exception):
return _is_server_overloaded(e) or _is_file_io_timeout(e)
For the size exceeded wrapper we have a simpler filter, _is_file_size_exceeded :
def _is_file_size_exceeded(e: Exception):
return (
isinstance(e, ClientError)
and e.code == 400
and str(e).find(
"The request's total referenced files bytes are too large to be read"
)
>= 0
)
Here’s how we decorate the generate method, more concretely:
class GeminiClient:
@retry_with_backoff([30, 60], when=_is_retryable)
@skip_silently(when=_is_file_size_exceeded)
def generate(self, ...):
...
response = self._client.models.generate_content(
model=self._model.value,
contents= attached_files + [prompt],
config=types.GenerateContentConfig(
max_output_tokens=max_tokens,
system_instruction=system_prompt or None,
)
)
...
The diff can be viewed here.
To summarize, here’s how this implementation addresses the two issues with API failures that we pointed out at the beginning:
Senior Machine Learning Engineer | Artificial Intelligence | Deep Learning | Gen AI | Large Language Models | Open for New Opportunities | No Visa Sponsorship Needed
1moInsightful