Skip to main content

Env variables

MAXIM_API_KEY=
MAXIM_LOG_REPO_ID=
OPENAI_API_KEY=

Initialize Maxim logger

from maxim import Maxim

logger = Maxim().logger()

Initialize MaximOpenAIClient

from openai import OpenAI
from maxim.logger.openai import MaximOpenAIClient

client = MaximOpenAIClient(client=OpenAI(api_key=OPENAI_API_KEY),logger=logger)

Sample agent using Summarization agent from OpenAI cookbooks

The following agent uses the Context summarization agent from OpenAI cookbooks with additional tools for general tasks.
import asyncio
import base64
import json
import os
import random
import sys
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Literal

import simpleaudio  # speaker playback
import sounddevice as sd  # microphone capture
from dotenv import load_dotenv

# OpenAI SDK imports
from openai import OpenAI

from maxim import Maxim
from maxim.logger.openai import MaximOpenAIClient

load_dotenv()


# --------------------------------------------------------------------------- #
# Tool Definitions                                                            #
# --------------------------------------------------------------------------- #

TOOLS = [
    {
        "type": "function",
        "name": "get_weather",
        "description": "Get the current weather for a specific location. Use this when the user asks about weather conditions.",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "The city and country, e.g. 'Paris, France' or 'New York, USA'",
                },
                "unit": {
                    "type": "string",
                    "enum": ["celsius", "fahrenheit"],
                    "description": "Temperature unit preference",
                },
            },
            "required": ["location"],
        },
    },
    {
        "type": "function",
        "name": "get_current_time",
        "description": "Get the current date and time. Use this when the user asks what time it is or what the date is.",
        "parameters": {
            "type": "object",
            "properties": {
                "timezone": {
                    "type": "string",
                    "description": "The timezone, e.g. 'UTC', 'America/New_York', 'Europe/Paris'. Defaults to local time if not specified.",
                },
            },
            "required": [],
        },
    },
    {
        "type": "function",
        "name": "set_reminder",
        "description": "Set a reminder for the user. Use this when the user wants to be reminded about something.",
        "parameters": {
            "type": "object",
            "properties": {
                "message": {
                    "type": "string",
                    "description": "The reminder message",
                },
                "time": {
                    "type": "string",
                    "description": "When to remind, e.g. 'in 5 minutes', 'at 3pm', 'tomorrow morning'",
                },
            },
            "required": ["message", "time"],
        },
    },
]


def execute_tool(name: str, arguments: Dict[str, Any]) -> str:
    """Execute a tool and return the result as a string."""
    if name == "get_weather":
        # Simulated weather data
        location = arguments.get("location", "Unknown")
        unit = arguments.get("unit", "celsius")

        # Generate mock weather data
        conditions = ["sunny", "partly cloudy", "cloudy", "rainy", "stormy"]
        condition = random.choice(conditions)
        temp_c = random.randint(5, 35)
        temp = temp_c if unit == "celsius" else int(temp_c * 9 / 5 + 32)
        unit_symbol = "°C" if unit == "celsius" else "°F"
        humidity = random.randint(30, 90)

        return json.dumps(
            {
                "location": location,
                "temperature": f"{temp}{unit_symbol}",
                "condition": condition,
                "humidity": f"{humidity}%",
                "wind": f"{random.randint(5, 30)} km/h",
            }
        )

    elif name == "get_current_time":
        timezone = arguments.get("timezone", "local")
        now = datetime.now()

        return json.dumps(
            {
                "date": now.strftime("%A, %B %d, %Y"),
                "time": now.strftime("%I:%M %p"),
                "timezone": timezone if timezone != "local" else "Local Time",
                "unix_timestamp": int(now.timestamp()),
            }
        )

    elif name == "set_reminder":
        message = arguments.get("message", "")
        time = arguments.get("time", "")

        # In a real app, you'd actually schedule the reminder
        # Here we just acknowledge it
        return json.dumps(
            {
                "status": "success",
                "message": f"Reminder set: '{message}' for {time}",
                "id": f"reminder_{random.randint(1000, 9999)}",
            }
        )

    else:
        return json.dumps({"error": f"Unknown tool: {name}"})


# Set your API keys safely
openaiapikey = os.getenv("OPENAI_API_KEY", "")
if not openaiapikey:
    raise ValueError("OPENAI_API_KEY not found – please set env var or edit this cell.")

apikey = os.getenv("MAXIM_API_KEY")
baseURL = os.getenv("MAXIM_BASE_URL")

# Audio/config knobs
SAMPLE_RATE_HZ = 24_000  # Required by pcm16
CHUNK_DURATION_MS = 40  # chunk size for audio capture
BYTES_PER_SAMPLE = 2  # pcm16 = 2 bytes/sample
SUMMARY_TRIGGER = 2_000  # Summarise when context >= this
KEEP_LAST_TURNS = 2  # Keep these turns verbatim
SUMMARY_MODEL = "gpt-4o-mini"  # Cheaper, fast summariser


@dataclass
class Turn:
    """One utterance in the dialogue (user **or** assistant)."""

    role: Literal["user", "assistant"]
    item_id: str  # Server‑assigned identifier
    text: str | None = None  # Filled once transcript is ready


@dataclass
class ConversationState:
    """All mutable data the session needs — nothing more, nothing less."""

    history: List[Turn] = field(default_factory=list)  # Ordered log
    waiting: dict[str, asyncio.Future] = field(
        default_factory=dict
    )  # Pending transcript fetches
    summary_count: int = 0

    latest_tokens: int = 0  # Window size after last reply
    summarising: bool = False  # Guard so we don't run two summaries at once


def print_history(state) -> None:
    """Pretty-print the running transcript so far."""
    print("—— Conversation so far ———————————————")
    for turn in state.history:
        text_preview = (turn.text or "").strip().replace("\n", " ")
        print(f"[{turn.role:<9}] {text_preview}  ({turn.item_id})")
    print("——————————————————————————————————————————")


async def mic_to_queue(pcm_queue: asyncio.Queue[bytes]) -> None:
    """
    Capture raw PCM‑16 microphone audio and push ~CHUNK_DURATION_MS chunks
    to *pcm_queue* until the surrounding task is cancelled.

    Parameters
    ----------
    pcm_queue : asyncio.Queue[bytes]
        Destination queue for PCM‑16 frames (little‑endian int16).
    """
    blocksize = int(SAMPLE_RATE_HZ * CHUNK_DURATION_MS / 1000)

    def _callback(indata, _frames, _time, status):
        if status:  # XRuns, device changes, etc.
            print("⚠️", status, file=sys.stderr)
        try:
            pcm_queue.put_nowait(bytes(indata))  # 1‑shot enqueue
        except asyncio.QueueFull:
            # Drop frame if upstream (WebSocket) can't keep up.
            pass

    # RawInputStream is synchronous; wrap in context manager to auto‑close.
    with sd.RawInputStream(
        samplerate=SAMPLE_RATE_HZ,
        blocksize=blocksize,
        dtype="int16",
        channels=1,
        callback=_callback,
    ):
        try:
            # Keep coroutine alive until cancelled by caller.
            await asyncio.Event().wait()
        finally:
            print("⏹️  Mic stream closed.")


# Helper function to encode audio chunks in base64
def b64(blob):
    return base64.b64encode(blob).decode()


async def queue_to_connection(pcm_queue: asyncio.Queue[bytes], connection):
    """Read audio chunks from queue and send to connection."""
    try:
        while (chunk := await pcm_queue.get()) is not None:
            await connection.input_audio_buffer.append(audio=b64(chunk))
    except Exception as e:
        print(f"Connection closed – stopping uploader: {e}")


async def run_summary_llm(text: str, client: OpenAI) -> str:
    """Call a lightweight model to summarise `text`."""
    resp = await asyncio.to_thread(
        lambda: client.chat.completions.create(
            model=SUMMARY_MODEL,
            temperature=0,
            messages=[
                {
                    "role": "system",
                    "content": "Summarise in French the following conversation "
                    "in one concise paragraph so it can be used as "
                    "context for future dialogue.",
                },
                {"role": "user", "content": text},
            ],
        )
    )
    return resp.choices[0].message.content.strip()


async def summarise_and_prune(connection, state, client: OpenAI):
    """Summarise old turns, delete them server‑side, and prepend a single summary
    turn locally + remotely."""
    state.summarising = True
    print(
        f"⚠️  Token window ≈{state.latest_tokens}{SUMMARY_TRIGGER}. Summarising…",
    )
    old_turns, recent_turns = (
        state.history[:-KEEP_LAST_TURNS],
        state.history[-KEEP_LAST_TURNS:],
    )
    convo_text = "\n".join(f"{t.role}: {t.text}" for t in old_turns if t.text)

    if not convo_text:
        print("Nothing to summarise (transcripts still pending).")
        state.summarising = False
        return

    summary_text = await run_summary_llm(convo_text, client) if convo_text else ""
    state.summary_count += 1
    summary_id = f"sum_{state.summary_count:03d}"
    state.history[:] = [Turn("assistant", summary_id, summary_text)] + recent_turns

    print_history(state)

    # Create summary on server
    await connection.conversation.item.create(
        item={
            "id": summary_id,
            "type": "message",
            "role": "system",
            "content": [{"type": "input_text", "text": summary_text}],
        },
        previous_item_id="root",
    )

    # Delete old items
    for turn in old_turns:
        await connection.conversation.item.delete(item_id=turn.item_id)

    print(f"✅ Summary inserted ({summary_id})")

    state.summarising = False


async def fetch_full_item(
    connection, item_id: str, state: ConversationState, attempts: int = 1
):
    """
    Ask the server for a full conversation item; retry up to 5× if the
    transcript field is still null.  Resolve the waiting future when done.
    """
    # If there is already a pending fetch, just await it
    if item_id in state.waiting:
        return await state.waiting[item_id]

    fut = asyncio.get_running_loop().create_future()
    state.waiting[item_id] = fut

    # Request the item - this sends the request and returns immediately
    # The actual item will come through as a conversation.item.retrieved event
    await connection.conversation.item.retrieve(item_id=item_id)

    # Wait for the event to resolve the future (with timeout)
    try:
        item = await asyncio.wait_for(fut, timeout=2.0)
    except asyncio.TimeoutError:
        # If timeout, retry
        state.waiting.pop(item_id, None)
        if attempts < 5:
            await asyncio.sleep(0.4 * attempts)
            return await fetch_full_item(connection, item_id, state, attempts + 1)
        return None

    # If transcript still missing retry (max 5×)
    content = item.get("content", [{}])
    transcript = content[0].get("transcript") if content else None
    if attempts < 5 and not transcript:
        state.waiting.pop(item_id, None)
        await asyncio.sleep(0.4 * attempts)
        return await fetch_full_item(connection, item_id, state, attempts + 1)

    # Done – remove the marker
    state.waiting.pop(item_id, None)
    return item


# --------------------------------------------------------------------------- #
# Realtime session                                                          #
# --------------------------------------------------------------------------- #
async def realtime_session(
    model="gpt-realtime",
    voice="shimmer",
    enable_playback=True,
):
    """
    Main coroutine: connects to the Realtime endpoint, spawns helper tasks,
    and processes incoming events using the OpenAI SDK.
    """
    state = ConversationState()  # Reset state for each run

    pcm_queue: asyncio.Queue[bytes] = asyncio.Queue()
    assistant_audio: List[bytes] = []

    # Create logger and client
    logger = Maxim({"base_url": baseURL}).logger()
    openai_client = OpenAI(api_key=openaiapikey)
    client = MaximOpenAIClient(openai_client, logger=logger).aio

    async with client.realtime.connect(
        model=model,
        extra_headers={
            "maxim-session-name": "Context Summarization with Realtime API SDK",
            "maxim-session-tags": {
                "env": "staging",
                "feature": "context_summarization_with_realtime_api_sdk",
            },
        },
    ) as connection:
        # ------------------------------------------------------------------- #
        # Configure session: voice, modalities, audio formats, transcription  #
        # ------------------------------------------------------------------- #
        await connection.session.update(
            session={
                "model": model,
                "type": "realtime",
                "output_modalities": ["audio"],
                "tracing": "auto",
                "tools": TOOLS,
                "tool_choice": "auto",
                "audio": {
                    "input": {
                        "transcription": {
                            "language": "en",
                            "model": "gpt-4o-transcribe",
                        },
                        "format": {"rate": SAMPLE_RATE_HZ, "type": "audio/pcm"},
                    }
                },
            },
        )
        print("session.created ✅")
        print(f"🔧 Tools available: {[t['name'] for t in TOOLS]}")

        # ------------------------------------------------------------------- #
        # Launch background tasks: mic capture → queue → connection            #
        # ------------------------------------------------------------------- #
        mic_task = asyncio.create_task(mic_to_queue(pcm_queue))
        upl_task = asyncio.create_task(queue_to_connection(pcm_queue, connection))

        print("🎙️ Speak now (Ctrl‑C to quit)…")

        try:
            # ------------------------------------------------------------------- #
            # Main event loop: process incoming events from the connection         #
            # ------------------------------------------------------------------- #
            async for event in connection:
                etype = event.type

                # --------------------------------------------------------------- #
                # User just spoke ⇢ conversation.item.created (role = user)        #
                # --------------------------------------------------------------- #
                if etype == "conversation.item.created":
                    item = event.item
                    if item.role == "user":
                        text = None
                        if item.content:
                            # Extract transcript from content
                            content_item = item.content[0] if item.content else None
                            if content_item:
                                # Handle both dict and object access patterns
                                if isinstance(content_item, dict):
                                    text = content_item.get("transcript")
                                elif hasattr(content_item, "transcript"):
                                    text = content_item.transcript
                                elif hasattr(content_item, "get"):
                                    text = content_item.get("transcript")

                        state.history.append(Turn("user", item.id, text))

                        # If transcript not yet available, fetch it later
                        if text is None:
                            asyncio.create_task(
                                fetch_full_item(connection, item.id, state)
                            )

                # --------------------------------------------------------------- #
                # Transcript fetched ⇢ conversation.item.retrieved                 #
                # --------------------------------------------------------------- #
                elif etype == "conversation.item.retrieved":
                    item = event.item

                    # Extract transcript from content
                    transcript = None
                    if item.content:
                        content_item = item.content[0] if item.content else None
                        if content_item:
                            # Handle both dict and object access patterns
                            if isinstance(content_item, dict):
                                transcript = content_item.get("transcript")
                            elif hasattr(content_item, "transcript"):
                                transcript = content_item.transcript
                            elif hasattr(content_item, "get"):
                                transcript = content_item.get("transcript")

                    # Fill missing transcript in history
                    for t in state.history:
                        if t.item_id == item.id:
                            t.text = transcript
                            break

                    # Resolve the future if one exists
                    item_id = item.id
                    if item_id in state.waiting:
                        # Convert item to dict-like structure matching original format
                        # This matches what fetch_full_item expects
                        item_dict = {
                            "id": item.id,
                            "content": [{"transcript": transcript}]
                            if transcript
                            else [],
                        }
                        state.waiting[item_id].set_result(item_dict)
                        state.waiting.pop(item_id, None)

                # --------------------------------------------------------------- #
                # Assistant audio arrives in deltas                               #
                # --------------------------------------------------------------- #
                elif etype == "response.output_audio.delta":
                    assistant_audio.append(base64.b64decode(event.delta))

                # --------------------------------------------------------------- #
                # Function call completed - execute tool and send result          #
                # --------------------------------------------------------------- #
                elif etype == "response.function_call_arguments.done":
                    call_id = event.call_id
                    function_name = event.name
                    arguments_str = event.arguments

                    print(f"🔧 Tool call: {function_name}({arguments_str})")

                    # Parse arguments and execute the tool
                    try:
                        arguments = json.loads(arguments_str) if arguments_str else {}
                    except json.JSONDecodeError:
                        arguments = {}

                    result = execute_tool(function_name, arguments)
                    print(f"   → Result: {result}")

                    # Send the function call output back to the conversation
                    await connection.conversation.item.create(
                        item={
                            "type": "function_call_output",
                            "call_id": call_id,
                            "output": result,
                        }
                    )

                    # Trigger a new response to continue the conversation
                    await connection.response.create()

                # --------------------------------------------------------------- #
                # Assistant reply finished ⇢ response.done                        #
                # --------------------------------------------------------------- #
                elif etype == "response.done":
                    response = event.response

                    # Extract assistant messages and tokens
                    if response.output:
                        for output_item in response.output:
                            if (
                                output_item.type == "message"
                                and output_item.role == "assistant"
                            ):
                                content_item = (
                                    output_item.content[0]
                                    if output_item.content
                                    else None
                                )
                                txt = None
                                if content_item:
                                    # Handle both dict and object access patterns
                                    if isinstance(content_item, dict):
                                        txt = content_item.get("transcript")
                                    elif hasattr(content_item, "transcript"):
                                        txt = content_item.transcript
                                    elif hasattr(content_item, "get"):
                                        txt = content_item.get("transcript")

                                if txt:
                                    state.history.append(
                                        Turn("assistant", output_item.id, txt)
                                    )

                    # Extract token usage
                    if response.usage:
                        state.latest_tokens = response.usage.total_tokens

                    print(
                        f"—— response.done  (window ≈{state.latest_tokens} tokens) ——"
                    )
                    print_history(state)

                    # Fetch any still‑missing user transcripts
                    for turn in state.history:
                        if (
                            turn.role == "user"
                            and turn.text is None
                            and turn.item_id not in state.waiting
                        ):
                            asyncio.create_task(
                                fetch_full_item(connection, turn.item_id, state)
                            )

                    # Playback collected audio once reply completes
                    if enable_playback and assistant_audio:
                        simpleaudio.play_buffer(
                            b"".join(assistant_audio),
                            1,
                            BYTES_PER_SAMPLE,
                            SAMPLE_RATE_HZ,
                        )
                        assistant_audio.clear()

                    # Summarise if context too large – fire in background so we don't block dialogue
                    if (
                        state.latest_tokens >= SUMMARY_TRIGGER
                        and len(state.history) > KEEP_LAST_TURNS
                        and not state.summarising
                    ):
                        asyncio.create_task(
                            summarise_and_prune(connection, state, openai_client)
                        )

        except KeyboardInterrupt:
            print("\nStopping…")
        finally:
            mic_task.cancel()
            await pcm_queue.put(None)
            await upl_task


def main():
    """Run the realtime session."""
    asyncio.run(realtime_session())


if __name__ == "__main__":
    main()

You can view the trace of the above agents’ interactions on the Maxim dashboard, which provides detailed insights and visualizations of the entire process.

Additional headers

You can pass in any of the following headers to the Realtime API connection for some additional enrichment to the sessions:
  • maxim-session-name: Custom name for the session to be created.
  • maxim-session-tags: A map of tags for the session to be created.
  • maxim-session-id: Custom ID for the session to be created.
These would be passed to the extra_headers parameter of the connect method.
async with client.realtime.connect(
    model=model,
    extra_headers={
        "maxim-session-name": "Context Summarization with Realtime API SDK",
        "maxim-session-tags": {
            "env": "staging",
            "feature": "context_summarization_with_realtime_api_sdk",
        },
    },
) as connection:
    ...