Skip to content

Commit

Permalink
feat(api): add message batches api
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertCraigie committed Oct 8, 2024
1 parent 59f2088 commit cd1ffcb
Show file tree
Hide file tree
Showing 73 changed files with 6,311 additions and 33 deletions.
4 changes: 2 additions & 2 deletions .stats.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
configured_endpoints: 3
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/anthropic-286f00929e2a4d28d991e6a7e660fa801dca7ec91d8ecb2fc17654bb8173eb0d.yml
configured_endpoints: 9
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/anthropic-aedee7570aa925baba404fc5bd3c8c1fffe8845517e492751db9b175c5cae9da.yml
115 changes: 115 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,50 @@ message.usage
# Usage(input_tokens=25, output_tokens=13)
```

## Message Batches

This SDK provides beta support for the [Message Batches API](https://docs.anthropic.com/en/docs/build-with-claude/message-batches) under the `client.beta.messages.batches` namespace.


### Creating a batch

Message Batches take the exact same request params as the standard Messages API:

```python
await client.beta.messages.batches.create(
requests=[
{
"custom_id": "my-first-request",
"params": {
"model": "claude-3-5-sonnet-20240620",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello, world"}],
},
},
{
"custom_id": "my-second-request",
"params": {
"model": "claude-3-5-sonnet-20240620",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hi again, friend"}],
},
},
]
)
```


### Getting results from a batch

Once a Message Batch has been processed, indicated by `.processing_status === 'ended'`, you can access the results with `.batches.results()`

```python
result_stream = await client.beta.messages.batches.results(batch_id)
async for entry in result_stream:
if entry.result.type == "succeeded":
print(entry.result.message.content)
```

## Tool use

This SDK provides support for tool use, aka function calling. More details can be found in [the documentation](https://docs.anthropic.com/claude/docs/tool-use).
Expand Down Expand Up @@ -250,6 +294,77 @@ Nested request parameters are [TypedDicts](https://docs.python.org/3/library/typ

Typed requests and responses provide autocomplete and documentation within your editor. If you would like to see type errors in VS Code to help catch bugs earlier, set `python.analysis.typeCheckingMode` to `basic`.

## Pagination

List methods in the Anthropic API are paginated.

This library provides auto-paginating iterators with each list response, so you do not have to request successive pages manually:

```python
from anthropic import Anthropic

client = Anthropic()

all_batches = []
# Automatically fetches more pages as needed.
for batch in client.beta.messages.batches.list(
limit=20,
):
# Do something with batch here
all_batches.append(batch)
print(all_batches)
```

Or, asynchronously:

```python
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()


async def main() -> None:
all_batches = []
# Iterate through items across all pages, issuing requests as needed.
async for batch in client.beta.messages.batches.list(
limit=20,
):
all_batches.append(batch)
print(all_batches)


asyncio.run(main())
```

Alternatively, you can use the `.has_next_page()`, `.next_page_info()`, or `.get_next_page()` methods for more granular control working with pages:

```python
first_page = await client.beta.messages.batches.list(
limit=20,
)
if first_page.has_next_page():
print(f"will fetch next page using these details: {first_page.next_page_info()}")
next_page = await first_page.get_next_page()
print(f"number of items we just fetched: {len(next_page.data)}")

# Remove `await` for non-async usage.
```

Or just work directly with the returned data:

```python
first_page = await client.beta.messages.batches.list(
limit=20,
)

print(f"next page cursor: {first_page.last_id}") # => "next page cursor: ..."
for batch in first_page.data:
print(batch.id)

# Remove `await` for non-async usage.
```

## Handling errors

When the library is unable to connect to the API (for example, due to network connection problems or a timeout), a subclass of `anthropic.APIConnectionError` is raised.
Expand Down
83 changes: 83 additions & 0 deletions api.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,89 @@ Methods:

# Beta

Types:

```python
from anthropic.types import (
AnthropicBeta,
BetaAPIError,
BetaAuthenticationError,
BetaError,
BetaErrorResponse,
BetaInvalidRequestError,
BetaNotFoundError,
BetaOverloadedError,
BetaPermissionError,
BetaRateLimitError,
)
```

## Messages

Types:

```python
from anthropic.types.beta import (
BetaCacheControlEphemeral,
BetaContentBlock,
BetaContentBlockParam,
BetaImageBlockParam,
BetaInputJSONDelta,
BetaMessage,
BetaMessageDeltaUsage,
BetaMessageParam,
BetaMetadata,
BetaRawContentBlockDeltaEvent,
BetaRawContentBlockStartEvent,
BetaRawContentBlockStopEvent,
BetaRawMessageDeltaEvent,
BetaRawMessageStartEvent,
BetaRawMessageStopEvent,
BetaRawMessageStreamEvent,
BetaTextBlock,
BetaTextBlockParam,
BetaTextDelta,
BetaTool,
BetaToolChoice,
BetaToolChoiceAny,
BetaToolChoiceAuto,
BetaToolChoiceTool,
BetaToolResultBlockParam,
BetaToolUseBlock,
BetaToolUseBlockParam,
BetaUsage,
)
```

Methods:

- <code title="post /v1/messages?beta=true">client.beta.messages.<a href="./src/anthropic/resources/beta/messages/messages.py">create</a>(\*\*<a href="src/anthropic/types/beta/message_create_params.py">params</a>) -> <a href="./src/anthropic/types/beta/beta_message.py">BetaMessage</a></code>

### Batches

Types:

```python
from anthropic.types.beta.messages import (
BetaMessageBatch,
BetaMessageBatchCanceledResult,
BetaMessageBatchErroredResult,
BetaMessageBatchExpiredResult,
BetaMessageBatchIndividualResponse,
BetaMessageBatchRequestCounts,
BetaMessageBatchResult,
BetaMessageBatchSucceededResult,
)
```

Methods:

- <code title="post /v1/messages/batches?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">create</a>(\*\*<a href="src/anthropic/types/beta/messages/batch_create_params.py">params</a>) -> <a href="./src/anthropic/types/beta/messages/beta_message_batch.py">BetaMessageBatch</a></code>
- <code title="get /v1/messages/batches/{message_batch_id}?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">retrieve</a>(message_batch_id) -> <a href="./src/anthropic/types/beta/messages/beta_message_batch.py">BetaMessageBatch</a></code>
- <code title="get /v1/messages/batches?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">list</a>(\*\*<a href="src/anthropic/types/beta/messages/batch_list_params.py">params</a>) -> <a href="./src/anthropic/types/beta/messages/beta_message_batch.py">SyncPage[BetaMessageBatch]</a></code>
- <code title="post /v1/messages/batches/{message_batch_id}/cancel?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">cancel</a>(message_batch_id) -> <a href="./src/anthropic/types/beta/messages/beta_message_batch.py">BetaMessageBatch</a></code>
- <code title="get /v1/messages/batches/{message_batch_id}/results?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">results</a>(message_batch_id) -> BinaryAPIResponse</code>

## PromptCaching

### Messages
Expand Down
19 changes: 19 additions & 0 deletions examples/batch_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import sys
import time

import rich

from anthropic import Anthropic

client = Anthropic()

try:
batch_id = sys.argv[1]
except IndexError as exc:
raise RuntimeError("must specify a message batch ID, `python examples/batch_results.py msgbatch_123`") from exc

s = time.monotonic()

result_stream = client.beta.messages.batches.results(batch_id)
for result in result_stream:
rich.print(result)
101 changes: 101 additions & 0 deletions src/anthropic/_decoders/jsonl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

import json
from typing_extensions import Generic, TypeVar, Iterator, AsyncIterator

import httpx

from .._models import construct_type_unchecked

_T = TypeVar("_T")


class JSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.
This class provides an iterator over a byte-iterator that parses each JSON Line
into a given type.
"""

http_response: httpx.Response | None
"""The HTTP response this decoder was constructed from"""

def __init__(
self, *, raw_iterator: Iterator[bytes], line_type: type[_T], http_response: httpx.Response | None
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()

def __decode__(self) -> Iterator[_T]:
buf = b""
for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""

# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)

def __next__(self) -> _T:
return self._iterator.__next__()

def __iter__(self) -> Iterator[_T]:
for item in self._iterator:
yield item


class AsyncJSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.
This class provides an async iterator over a byte-iterator that parses each JSON Line
into a given type.
"""

http_response: httpx.Response | None

def __init__(
self, *, raw_iterator: AsyncIterator[bytes], line_type: type[_T], http_response: httpx.Response | None
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()

async def __decode__(self) -> AsyncIterator[_T]:
buf = b""
async for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""

# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)

async def __anext__(self) -> _T:
return await self._iterator.__anext__()

async def __aiter__(self) -> AsyncIterator[_T]:
async for item in self._iterator:
yield item
Loading

0 comments on commit cd1ffcb

Please sign in to comment.