Env variables
Copy
Ask AI
MAXIM_API_KEY=
MAXIM_LOG_REPO_ID=
OPENAI_API_KEY=
Initialize Maxim logger
Copy
Ask AI
from maxim import Maxim
logger = Maxim().logger()
Initialize MaximOpenAIClient
Copy
Ask AI
from openai import OpenAI
from maxim.logger.openai import MaximOpenAIClient
client = MaximOpenAIClient(client=OpenAI(api_key=OPENAI_API_KEY),logger=logger)
Sample agent using Summarization agent from OpenAI cookbooks
The following agent uses the Context summarization agent from OpenAI cookbooks with additional tools for general tasks.Copy
Ask AI
import asyncio
import base64
import json
import os
import random
import sys
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Literal
import simpleaudio # speaker playback
import sounddevice as sd # microphone capture
from dotenv import load_dotenv
# OpenAI SDK imports
from openai import OpenAI
from maxim import Maxim
from maxim.logger.openai import MaximOpenAIClient
load_dotenv()
# --------------------------------------------------------------------------- #
# Tool Definitions #
# --------------------------------------------------------------------------- #
TOOLS = [
{
"type": "function",
"name": "get_weather",
"description": "Get the current weather for a specific location. Use this when the user asks about weather conditions.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and country, e.g. 'Paris, France' or 'New York, USA'",
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit preference",
},
},
"required": ["location"],
},
},
{
"type": "function",
"name": "get_current_time",
"description": "Get the current date and time. Use this when the user asks what time it is or what the date is.",
"parameters": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "The timezone, e.g. 'UTC', 'America/New_York', 'Europe/Paris'. Defaults to local time if not specified.",
},
},
"required": [],
},
},
{
"type": "function",
"name": "set_reminder",
"description": "Set a reminder for the user. Use this when the user wants to be reminded about something.",
"parameters": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The reminder message",
},
"time": {
"type": "string",
"description": "When to remind, e.g. 'in 5 minutes', 'at 3pm', 'tomorrow morning'",
},
},
"required": ["message", "time"],
},
},
]
def execute_tool(name: str, arguments: Dict[str, Any]) -> str:
"""Execute a tool and return the result as a string."""
if name == "get_weather":
# Simulated weather data
location = arguments.get("location", "Unknown")
unit = arguments.get("unit", "celsius")
# Generate mock weather data
conditions = ["sunny", "partly cloudy", "cloudy", "rainy", "stormy"]
condition = random.choice(conditions)
temp_c = random.randint(5, 35)
temp = temp_c if unit == "celsius" else int(temp_c * 9 / 5 + 32)
unit_symbol = "°C" if unit == "celsius" else "°F"
humidity = random.randint(30, 90)
return json.dumps(
{
"location": location,
"temperature": f"{temp}{unit_symbol}",
"condition": condition,
"humidity": f"{humidity}%",
"wind": f"{random.randint(5, 30)} km/h",
}
)
elif name == "get_current_time":
timezone = arguments.get("timezone", "local")
now = datetime.now()
return json.dumps(
{
"date": now.strftime("%A, %B %d, %Y"),
"time": now.strftime("%I:%M %p"),
"timezone": timezone if timezone != "local" else "Local Time",
"unix_timestamp": int(now.timestamp()),
}
)
elif name == "set_reminder":
message = arguments.get("message", "")
time = arguments.get("time", "")
# In a real app, you'd actually schedule the reminder
# Here we just acknowledge it
return json.dumps(
{
"status": "success",
"message": f"Reminder set: '{message}' for {time}",
"id": f"reminder_{random.randint(1000, 9999)}",
}
)
else:
return json.dumps({"error": f"Unknown tool: {name}"})
# Set your API keys safely
openaiapikey = os.getenv("OPENAI_API_KEY", "")
if not openaiapikey:
raise ValueError("OPENAI_API_KEY not found – please set env var or edit this cell.")
apikey = os.getenv("MAXIM_API_KEY")
baseURL = os.getenv("MAXIM_BASE_URL")
# Audio/config knobs
SAMPLE_RATE_HZ = 24_000 # Required by pcm16
CHUNK_DURATION_MS = 40 # chunk size for audio capture
BYTES_PER_SAMPLE = 2 # pcm16 = 2 bytes/sample
SUMMARY_TRIGGER = 2_000 # Summarise when context >= this
KEEP_LAST_TURNS = 2 # Keep these turns verbatim
SUMMARY_MODEL = "gpt-4o-mini" # Cheaper, fast summariser
@dataclass
class Turn:
"""One utterance in the dialogue (user **or** assistant)."""
role: Literal["user", "assistant"]
item_id: str # Server‑assigned identifier
text: str | None = None # Filled once transcript is ready
@dataclass
class ConversationState:
"""All mutable data the session needs — nothing more, nothing less."""
history: List[Turn] = field(default_factory=list) # Ordered log
waiting: dict[str, asyncio.Future] = field(
default_factory=dict
) # Pending transcript fetches
summary_count: int = 0
latest_tokens: int = 0 # Window size after last reply
summarising: bool = False # Guard so we don't run two summaries at once
def print_history(state) -> None:
"""Pretty-print the running transcript so far."""
print("—— Conversation so far ———————————————")
for turn in state.history:
text_preview = (turn.text or "").strip().replace("\n", " ")
print(f"[{turn.role:<9}] {text_preview} ({turn.item_id})")
print("——————————————————————————————————————————")
async def mic_to_queue(pcm_queue: asyncio.Queue[bytes]) -> None:
"""
Capture raw PCM‑16 microphone audio and push ~CHUNK_DURATION_MS chunks
to *pcm_queue* until the surrounding task is cancelled.
Parameters
----------
pcm_queue : asyncio.Queue[bytes]
Destination queue for PCM‑16 frames (little‑endian int16).
"""
blocksize = int(SAMPLE_RATE_HZ * CHUNK_DURATION_MS / 1000)
def _callback(indata, _frames, _time, status):
if status: # XRuns, device changes, etc.
print("⚠️", status, file=sys.stderr)
try:
pcm_queue.put_nowait(bytes(indata)) # 1‑shot enqueue
except asyncio.QueueFull:
# Drop frame if upstream (WebSocket) can't keep up.
pass
# RawInputStream is synchronous; wrap in context manager to auto‑close.
with sd.RawInputStream(
samplerate=SAMPLE_RATE_HZ,
blocksize=blocksize,
dtype="int16",
channels=1,
callback=_callback,
):
try:
# Keep coroutine alive until cancelled by caller.
await asyncio.Event().wait()
finally:
print("⏹️ Mic stream closed.")
# Helper function to encode audio chunks in base64
def b64(blob):
return base64.b64encode(blob).decode()
async def queue_to_connection(pcm_queue: asyncio.Queue[bytes], connection):
"""Read audio chunks from queue and send to connection."""
try:
while (chunk := await pcm_queue.get()) is not None:
await connection.input_audio_buffer.append(audio=b64(chunk))
except Exception as e:
print(f"Connection closed – stopping uploader: {e}")
async def run_summary_llm(text: str, client: OpenAI) -> str:
"""Call a lightweight model to summarise `text`."""
resp = await asyncio.to_thread(
lambda: client.chat.completions.create(
model=SUMMARY_MODEL,
temperature=0,
messages=[
{
"role": "system",
"content": "Summarise in French the following conversation "
"in one concise paragraph so it can be used as "
"context for future dialogue.",
},
{"role": "user", "content": text},
],
)
)
return resp.choices[0].message.content.strip()
async def summarise_and_prune(connection, state, client: OpenAI):
"""Summarise old turns, delete them server‑side, and prepend a single summary
turn locally + remotely."""
state.summarising = True
print(
f"⚠️ Token window ≈{state.latest_tokens} ≥ {SUMMARY_TRIGGER}. Summarising…",
)
old_turns, recent_turns = (
state.history[:-KEEP_LAST_TURNS],
state.history[-KEEP_LAST_TURNS:],
)
convo_text = "\n".join(f"{t.role}: {t.text}" for t in old_turns if t.text)
if not convo_text:
print("Nothing to summarise (transcripts still pending).")
state.summarising = False
return
summary_text = await run_summary_llm(convo_text, client) if convo_text else ""
state.summary_count += 1
summary_id = f"sum_{state.summary_count:03d}"
state.history[:] = [Turn("assistant", summary_id, summary_text)] + recent_turns
print_history(state)
# Create summary on server
await connection.conversation.item.create(
item={
"id": summary_id,
"type": "message",
"role": "system",
"content": [{"type": "input_text", "text": summary_text}],
},
previous_item_id="root",
)
# Delete old items
for turn in old_turns:
await connection.conversation.item.delete(item_id=turn.item_id)
print(f"✅ Summary inserted ({summary_id})")
state.summarising = False
async def fetch_full_item(
connection, item_id: str, state: ConversationState, attempts: int = 1
):
"""
Ask the server for a full conversation item; retry up to 5× if the
transcript field is still null. Resolve the waiting future when done.
"""
# If there is already a pending fetch, just await it
if item_id in state.waiting:
return await state.waiting[item_id]
fut = asyncio.get_running_loop().create_future()
state.waiting[item_id] = fut
# Request the item - this sends the request and returns immediately
# The actual item will come through as a conversation.item.retrieved event
await connection.conversation.item.retrieve(item_id=item_id)
# Wait for the event to resolve the future (with timeout)
try:
item = await asyncio.wait_for(fut, timeout=2.0)
except asyncio.TimeoutError:
# If timeout, retry
state.waiting.pop(item_id, None)
if attempts < 5:
await asyncio.sleep(0.4 * attempts)
return await fetch_full_item(connection, item_id, state, attempts + 1)
return None
# If transcript still missing retry (max 5×)
content = item.get("content", [{}])
transcript = content[0].get("transcript") if content else None
if attempts < 5 and not transcript:
state.waiting.pop(item_id, None)
await asyncio.sleep(0.4 * attempts)
return await fetch_full_item(connection, item_id, state, attempts + 1)
# Done – remove the marker
state.waiting.pop(item_id, None)
return item
# --------------------------------------------------------------------------- #
# Realtime session #
# --------------------------------------------------------------------------- #
async def realtime_session(
model="gpt-realtime",
voice="shimmer",
enable_playback=True,
):
"""
Main coroutine: connects to the Realtime endpoint, spawns helper tasks,
and processes incoming events using the OpenAI SDK.
"""
state = ConversationState() # Reset state for each run
pcm_queue: asyncio.Queue[bytes] = asyncio.Queue()
assistant_audio: List[bytes] = []
# Create logger and client
logger = Maxim({"base_url": baseURL}).logger()
openai_client = OpenAI(api_key=openaiapikey)
client = MaximOpenAIClient(openai_client, logger=logger).aio
async with client.realtime.connect(
model=model,
extra_headers={
"maxim-session-name": "Context Summarization with Realtime API SDK",
"maxim-session-tags": {
"env": "staging",
"feature": "context_summarization_with_realtime_api_sdk",
},
},
) as connection:
# ------------------------------------------------------------------- #
# Configure session: voice, modalities, audio formats, transcription #
# ------------------------------------------------------------------- #
await connection.session.update(
session={
"model": model,
"type": "realtime",
"output_modalities": ["audio"],
"tracing": "auto",
"tools": TOOLS,
"tool_choice": "auto",
"audio": {
"input": {
"transcription": {
"language": "en",
"model": "gpt-4o-transcribe",
},
"format": {"rate": SAMPLE_RATE_HZ, "type": "audio/pcm"},
}
},
},
)
print("session.created ✅")
print(f"🔧 Tools available: {[t['name'] for t in TOOLS]}")
# ------------------------------------------------------------------- #
# Launch background tasks: mic capture → queue → connection #
# ------------------------------------------------------------------- #
mic_task = asyncio.create_task(mic_to_queue(pcm_queue))
upl_task = asyncio.create_task(queue_to_connection(pcm_queue, connection))
print("🎙️ Speak now (Ctrl‑C to quit)…")
try:
# ------------------------------------------------------------------- #
# Main event loop: process incoming events from the connection #
# ------------------------------------------------------------------- #
async for event in connection:
etype = event.type
# --------------------------------------------------------------- #
# User just spoke ⇢ conversation.item.created (role = user) #
# --------------------------------------------------------------- #
if etype == "conversation.item.created":
item = event.item
if item.role == "user":
text = None
if item.content:
# Extract transcript from content
content_item = item.content[0] if item.content else None
if content_item:
# Handle both dict and object access patterns
if isinstance(content_item, dict):
text = content_item.get("transcript")
elif hasattr(content_item, "transcript"):
text = content_item.transcript
elif hasattr(content_item, "get"):
text = content_item.get("transcript")
state.history.append(Turn("user", item.id, text))
# If transcript not yet available, fetch it later
if text is None:
asyncio.create_task(
fetch_full_item(connection, item.id, state)
)
# --------------------------------------------------------------- #
# Transcript fetched ⇢ conversation.item.retrieved #
# --------------------------------------------------------------- #
elif etype == "conversation.item.retrieved":
item = event.item
# Extract transcript from content
transcript = None
if item.content:
content_item = item.content[0] if item.content else None
if content_item:
# Handle both dict and object access patterns
if isinstance(content_item, dict):
transcript = content_item.get("transcript")
elif hasattr(content_item, "transcript"):
transcript = content_item.transcript
elif hasattr(content_item, "get"):
transcript = content_item.get("transcript")
# Fill missing transcript in history
for t in state.history:
if t.item_id == item.id:
t.text = transcript
break
# Resolve the future if one exists
item_id = item.id
if item_id in state.waiting:
# Convert item to dict-like structure matching original format
# This matches what fetch_full_item expects
item_dict = {
"id": item.id,
"content": [{"transcript": transcript}]
if transcript
else [],
}
state.waiting[item_id].set_result(item_dict)
state.waiting.pop(item_id, None)
# --------------------------------------------------------------- #
# Assistant audio arrives in deltas #
# --------------------------------------------------------------- #
elif etype == "response.output_audio.delta":
assistant_audio.append(base64.b64decode(event.delta))
# --------------------------------------------------------------- #
# Function call completed - execute tool and send result #
# --------------------------------------------------------------- #
elif etype == "response.function_call_arguments.done":
call_id = event.call_id
function_name = event.name
arguments_str = event.arguments
print(f"🔧 Tool call: {function_name}({arguments_str})")
# Parse arguments and execute the tool
try:
arguments = json.loads(arguments_str) if arguments_str else {}
except json.JSONDecodeError:
arguments = {}
result = execute_tool(function_name, arguments)
print(f" → Result: {result}")
# Send the function call output back to the conversation
await connection.conversation.item.create(
item={
"type": "function_call_output",
"call_id": call_id,
"output": result,
}
)
# Trigger a new response to continue the conversation
await connection.response.create()
# --------------------------------------------------------------- #
# Assistant reply finished ⇢ response.done #
# --------------------------------------------------------------- #
elif etype == "response.done":
response = event.response
# Extract assistant messages and tokens
if response.output:
for output_item in response.output:
if (
output_item.type == "message"
and output_item.role == "assistant"
):
content_item = (
output_item.content[0]
if output_item.content
else None
)
txt = None
if content_item:
# Handle both dict and object access patterns
if isinstance(content_item, dict):
txt = content_item.get("transcript")
elif hasattr(content_item, "transcript"):
txt = content_item.transcript
elif hasattr(content_item, "get"):
txt = content_item.get("transcript")
if txt:
state.history.append(
Turn("assistant", output_item.id, txt)
)
# Extract token usage
if response.usage:
state.latest_tokens = response.usage.total_tokens
print(
f"—— response.done (window ≈{state.latest_tokens} tokens) ——"
)
print_history(state)
# Fetch any still‑missing user transcripts
for turn in state.history:
if (
turn.role == "user"
and turn.text is None
and turn.item_id not in state.waiting
):
asyncio.create_task(
fetch_full_item(connection, turn.item_id, state)
)
# Playback collected audio once reply completes
if enable_playback and assistant_audio:
simpleaudio.play_buffer(
b"".join(assistant_audio),
1,
BYTES_PER_SAMPLE,
SAMPLE_RATE_HZ,
)
assistant_audio.clear()
# Summarise if context too large – fire in background so we don't block dialogue
if (
state.latest_tokens >= SUMMARY_TRIGGER
and len(state.history) > KEEP_LAST_TURNS
and not state.summarising
):
asyncio.create_task(
summarise_and_prune(connection, state, openai_client)
)
except KeyboardInterrupt:
print("\nStopping…")
finally:
mic_task.cancel()
await pcm_queue.put(None)
await upl_task
def main():
"""Run the realtime session."""
asyncio.run(realtime_session())
if __name__ == "__main__":
main()
Additional headers
You can pass in any of the following headers to the Realtime API connection for some additional enrichment to the sessions:maxim-session-name: Custom name for the session to be created.maxim-session-tags: A map of tags for the session to be created.maxim-session-id: Custom ID for the session to be created.
extra_headers parameter of the connect method.
Copy
Ask AI
async with client.realtime.connect(
model=model,
extra_headers={
"maxim-session-name": "Context Summarization with Realtime API SDK",
"maxim-session-tags": {
"env": "staging",
"feature": "context_summarization_with_realtime_api_sdk",
},
},
) as connection:
...