Python SDK Reference

Complete API reference for the AIM Python SDK. The SDK provides a high-level, Pythonic interface for agent registration, capability management, MCP integration, and secure operations.

v1.21.0StablePython 3.8+

Installation

Install via pip: The AIM SDK is available on PyPI. Use pip install aim-sdk then aim-sdk login to authenticate.

# Install from PyPI
pip install aim-sdk

# Login to AIM (opens browser for secure OAuth authentication)
aim-sdk login                              # AIM Cloud (aim.opena2a.org)
aim-sdk login --url http://localhost:8080  # Self-hosted

# Check authentication status
aim-sdk status

# Start using the SDK
python -c "from aim_sdk import secure; agent = secure('my-agent')"

# The aim-sdk login command:
# - Opens your browser for OAuth authentication (Google, etc.)
# - Uses PKCE (Proof Key for Code Exchange) for security
# - Saves credentials to ~/.aim/sdk_credentials.json
# - No password prompts or permission dialogs

Authentication:

  1. 1. Run aim-sdk login
  2. 2. Browser opens for secure OAuth authentication
  3. 3. Sign in with Google (or other provider)
  4. 4. Credentials saved automatically - start coding!

Quick Start

from aim_sdk import secure

# Zero-configuration mode (uses embedded credentials)
agent = secure("my-agent")

# The SDK automatically:
# - Registers the agent with AIM
# - Generates Ed25519 keypair
# - Manages authentication tokens
# - Auto-detects agent type from imports
# - Auto-detects capabilities

# Secure a function with capability verification
@agent.perform_action("db:read", resource="users_table")
def get_users():
    return database.query("SELECT * FROM users")

# The decorator verifies capability with AIM before execution
result = get_users()

Core Classes & Functions

Module Imports

# Core Classes and Functions

from aim_sdk import (
    secure,                # One-line agent registration (alias for register_agent)
    AIMClient,             # Main client class (returned by secure())
    AgentType,             # Agent type constants (LANGCHAIN, CLAUDE, GPT, etc.)

    # Exceptions
    AIMError,              # Base exception class
    AuthenticationError,   # Authentication failures
    VerificationError,     # Verification failures
    ActionDeniedError,     # Capability denied by AIM

    # Auto-detection utilities
    auto_detect_capabilities,  # Detect capabilities from code
    auto_detect_agent_type,    # Detect agent type from imports

    # Security logging (SOC/SIEM integration)
    SecurityLogger,        # Structured security event logger
    configure_security_logging,  # Configure from environment

    # Credential management
    load_sdk_credentials,  # Load saved OAuth credentials
    save_sdk_credentials,  # Save OAuth credentials

    # A2A (Agent-to-Agent) protocol
    A2AClient,             # Agent-to-agent communication
    A2AAgentCard,          # Agent identity card for A2A
)

AIMClient Class Methods

# AIMClient Class Methods (returned by secure())

class AIMClient:
    """Main client class for interacting with AIM"""

    # Capability Verification
    def verify_capability(
        self,
        capability: str,
        resource: str = None,
        context: Dict = None,
        timeout_seconds: int = 300
    ) -> Dict:
        """Request verification for a capability from AIM.
        Returns dict with: verified, verification_id, approved_by, expires_at"""

    def perform_action(
        self,
        capability: str = None,
        resource: str = None,
        context: Dict = None,
        timeout_seconds: int = 300,
        jit_access: bool = False,
        risk_level: str = None,
        auto_register: bool = True
    ):
        """Decorator for automatic capability verification and action tracking.
        Verifies with AIM before execution, logs results after."""

    # Capability Management
    def register_capability(
        self,
        capability_type: str,
        description: str = "",
        risk_level: str = "medium"
    ) -> Dict:
        """Register a capability this agent intends to use."""

    def request_capability(
        self,
        capability_type: str,
        reason: str,
        metadata: Dict = None
    ) -> Dict:
        """Request an additional capability (requires admin approval)."""

    # Agent Management (programmatic)
    def create_new_agent(
        self,
        name: str,
        display_name: str = None,
        description: str = None,
        agent_type: str = "ai_agent",
        capabilities: List[str] = None,
        mcp_servers: List[str] = None
    ) -> Dict:
        """Create/register a new agent through an authenticated client."""

    def list_agents(self) -> Dict:
        """List all agents in the organization."""

    def get_agent_details(self, agent_id: str) -> Dict:
        """Get detailed information about a specific agent."""

    def update_agent(self, display_name: str = None, ...) -> Dict:
        """Update agent properties."""

    def delete_agent(self, agent_id: str) -> Dict:
        """Delete an agent."""

Decorators & Clean Code

Decorator-Based API

# @perform_action Decorator for Clean Code

from aim_sdk import secure

agent = secure("data-processor")

# Simple usage - function name becomes the capability
@agent.perform_action()
def search_flights(destination):
    """Capability auto-detected as 'search_flights'"""
    return api.search(destination)

# Explicit capability with resource tracking
@agent.perform_action("db:read", resource="users_table")
def get_users():
    """Verified against AIM before execution"""
    return database.query("SELECT * FROM users")

# JIT Access - requires real-time admin approval
@agent.perform_action(
    capability="payment:refund",
    jit_access=True,
    risk_level="critical"
)
def process_refund(order_id: str, amount: float):
    """Waits for admin approval in AIM dashboard before executing"""
    return stripe.refund(order_id, amount)

# Risk levels: "low", "medium", "high", "critical"
# Auto-detected from capability name if not specified
@agent.perform_action("file:write", risk_level="medium")
def save_report(path: str, data: dict):
    """Risk level determines approval requirements"""
    with open(path, "w") as f:
        json.dump(data, f)

# Usage - decorators handle verification transparently
result = get_users()              # Verified, then executed
refund = process_refund("ord_123", 50.00)  # Waits for approval

Configuration

Configuration Options

# Configuration Options

from aim_sdk import secure, AIMClient, AgentType

# -----------------------------------------------
# 1. Zero-Config Mode (Recommended - uses OAuth)
# -----------------------------------------------
# Run "aim-sdk login" first, then:
agent = secure("my-agent")
# Credentials loaded from ~/.aim/sdk_credentials.json

# -----------------------------------------------
# 2. With API Key
# -----------------------------------------------
agent = secure("my-agent", api_key="aim_abc123")

# -----------------------------------------------
# 3. Explicit Agent Type (skip auto-detection)
# -----------------------------------------------
agent = secure(
    "my-agent",
    agent_type=AgentType.LANGCHAIN,
    auto_detect=False,
    capabilities=["db:read", "api:call"]
)

# -----------------------------------------------
# 4. With MCP Servers (from Claude Desktop config)
# -----------------------------------------------
agent = secure(
    "my-agent",
    mcp_servers=["github", "filesystem"]  # Must be in Claude Desktop config
)

# -----------------------------------------------
# 5. Full Manual Configuration (AIMClient directly)
# -----------------------------------------------
client = AIMClient(
    agent_id="550e8400-e29b-41d4-a716-446655440000",
    public_key="base64-ed25519-public-key",
    private_key="base64-ed25519-private-key",
    aim_url="https://aim.example.com",
    timeout=30,           # HTTP request timeout (seconds)
    auto_retry=True,      # Auto-retry failed requests
    max_retries=3         # Max retry attempts
)

# -----------------------------------------------
# 6. Custom AIM Server URL
# -----------------------------------------------
agent = secure(
    "my-agent",
    aim_url="http://localhost:8080"  # Self-hosted AIM
)

Async/Await Support

# Using perform_action with Async Functions

from aim_sdk import secure

agent = secure("my-agent")

# The @perform_action decorator works with regular functions.
# Capability verification is synchronous (HTTP request to AIM),
# but your decorated function executes normally after approval.

@agent.perform_action("db:read", resource="users_table")
def get_users():
    """Verification happens before execution"""
    return database.query("SELECT * FROM users")

# For async workflows, use verify_capability() directly
import asyncio

async def process_data_async():
    # Verify capability first (synchronous call to AIM)
    result = agent.verify_capability(
        capability="data:process",
        resource="dataset_v2"
    )

    if result["verified"]:
        # Run your async logic after verification
        data = await fetch_data()
        processed = await transform(data)
        return processed

# Agent management methods work in any context
agents = agent.list_agents()
for a in agents.get("agents", []):
    print(f"{a['name']}: {a['status']}")

# JIT access with timeout (waits for admin approval)
@agent.perform_action(
    capability="system:admin",
    jit_access=True,
    timeout_seconds=600  # Wait up to 10 minutes for approval
)
def admin_operation():
    return system.execute_admin_task()

Error Handling

Exception Hierarchy

# Error Handling

from aim_sdk import (
    secure,
    AIMError,              # Base exception for all AIM errors
    AuthenticationError,   # Auth token or credential issues
    VerificationError,     # Verification request failures
    ActionDeniedError,     # Capability explicitly denied by AIM
)
from aim_sdk.exceptions import ConfigurationError  # SDK misconfiguration

# Registration errors
try:
    agent = secure("my-agent")
except ConfigurationError as e:
    # SDK is misconfigured (missing credentials, invalid URL)
    print(f"Configuration error: {e}")
except AuthenticationError as e:
    # Authentication failed (bad credentials, expired OAuth token)
    # Fix: run "aim-sdk login" to refresh OAuth credentials
    print(f"Auth error: {e}")

# Capability verification errors
try:
    @agent.perform_action("db:write", resource="users_table")
    def update_user(user_id, data):
        return database.update(user_id, data)

    update_user("user-123", {"name": "Alice"})

except ActionDeniedError as e:
    # AIM explicitly denied the capability
    # Agent does not have permission for this action
    print(f"Action denied: {e}")

except VerificationError as e:
    # Verification request failed (network, timeout, server error)
    print(f"Verification failed: {e}")

except AIMError as e:
    # Base exception - catches all AIM SDK errors
    print(f"AIM error: {e}")

# Direct capability verification with error handling
try:
    result = agent.verify_capability(
        capability="email:send",
        resource="admin@example.com"
    )
    if result["verified"]:
        send_email("admin@example.com", "Report ready")
except ActionDeniedError:
    print("Not authorized to send emails")

MCP Server Integration

# MCP Server Integration

from aim_sdk import secure

# Register agent with MCP servers from Claude Desktop config
# The SDK auto-discovers server URLs and capabilities from Claude config
agent = secure(
    "mcp-agent",
    mcp_servers=["github", "filesystem", "data-processor"]
)

# Or auto-detect all MCP servers from Claude Desktop config
agent = secure("mcp-agent", auto_detect_mcp=True)

# Manual MCP server registration via the integration module
from aim_sdk.integrations.mcp.registration import register_mcp_server

server_info = register_mcp_server(
    aim_client=agent,
    server_name="research-mcp",
    server_url="http://localhost:3000",
    public_key="base64-ed25519-public-key",
    capabilities=["tools", "resources"],
    description="Research assistant MCP server",
    version="1.0.0"
)
print(f"Server registered: {server_info['id']}")
print(f"Trust score: {server_info['trust_score']}")

# MCP auto-detection utilities
from aim_sdk import auto_detect_mcps, MCPDetector

# Detect MCP servers from environment
detected = auto_detect_mcps()
for server in detected:
    print(f"Found MCP server: {server['name']}")
    print(f"  Command: {server.get('command', 'N/A')}")
    print(f"  Capabilities: {server.get('capabilities', [])}")

# Track MCP tool calls for audit logging
from aim_sdk import track_mcp_call
track_mcp_call("github", "create_issue", {"title": "Bug fix"})

CLI Tools

Command-Line Interface

# CLI Tools (aim-sdk command)

# Login to AIM (opens browser for OAuth authentication)
$ aim-sdk login
Opening browser for authentication...
Login successful! Credentials saved to ~/.aim/sdk_credentials.json

# Login to self-hosted AIM server
$ aim-sdk login --url http://localhost:8080
Opening browser for authentication at http://localhost:8080...
Login successful!

# Force re-authentication
$ aim-sdk login --force

# Check authentication status
$ aim-sdk status
AIM SDK Status:
  Server: https://aim.opena2a.org
  User: user@example.com
  Organization: My Org
  Token: Valid (expires in 23h)
  SDK Token: Active

# Show SDK version
$ aim-sdk version
aim-sdk 1.21.0

# Logout and clear credentials
$ aim-sdk logout
Credentials cleared from ~/.aim/sdk_credentials.json

# After login, use the SDK in Python:
$ python -c "
from aim_sdk import secure
agent = secure('my-agent')
print(f'Agent registered: {agent.agent_id}')
"

Testing & Development

# Testing & Development

import pytest
from unittest.mock import patch, MagicMock
from aim_sdk import AIMClient, AIMError, ActionDeniedError

# Unit testing - mock the AIM client methods
def test_perform_action_approved():
    """Test that a decorated function executes when AIM approves"""
    client = MagicMock(spec=AIMClient)
    client.verify_capability.return_value = {
        "verified": True,
        "verification_id": "ver-123",
        "approved_by": "admin",
        "expires_at": "2026-01-01T00:00:00Z"
    }

    # Test your business logic independently
    result = get_users_from_db()
    assert len(result) > 0

def test_perform_action_denied():
    """Test behavior when AIM denies a capability"""
    client = MagicMock(spec=AIMClient)
    client.verify_capability.side_effect = ActionDeniedError(
        "Agent not authorized for db:write"
    )

    with pytest.raises(ActionDeniedError):
        client.verify_capability("db:write", resource="users")

# Integration testing with a real AIM server
@pytest.fixture
def aim_client():
    """Create a test client against local AIM"""
    from aim_sdk import secure
    return secure(
        "test-agent",
        aim_url="http://localhost:8080",
        force_new=True
    )

def test_agent_registration(aim_client):
    assert aim_client.agent_id is not None

def test_capability_verification(aim_client):
    result = aim_client.verify_capability("db:read", resource="test_table")
    assert result["verified"] is True

def test_request_capability(aim_client):
    result = aim_client.request_capability(
        capability_type="db:write",
        reason="Need write access for integration tests"
    )
    assert result["status"] in ["pending", "approved"]

Advanced Features

# Advanced Features

from aim_sdk import secure, AgentType

# 1. Agent Management (create agents programmatically)
admin = secure("admin-agent")

new_agent = admin.create_new_agent(
    name="data-processor",
    display_name="Data Processor Agent",
    description="Processes incoming data feeds",
    agent_type=AgentType.LANGCHAIN,
    capabilities=["db:read", "file:write", "api:call"]
)
print(f"Created agent: {new_agent['id']}")

# List all agents in the organization
agents = admin.list_agents()
for a in agents.get("agents", []):
    print(f"{a['name']}: {a['status']} (trust: {a.get('trust_score', 'N/A')})")

# Get/update/delete agents
details = admin.get_agent_details(new_agent["id"])
admin.update_agent(display_name="Updated Processor")
# admin.delete_agent(agent_id)

# 2. Security Logging (SOC/SIEM Integration)
from aim_sdk import (
    SecurityLogger, configure_security_logging,
    EventSeverity, AuthzEventType
)

# Configure from environment (SECURITY_LOG_FILE, SECURITY_LOG_FORMAT, etc.)
configure_security_logging()

# Or configure programmatically
logger = SecurityLogger(log_file="/var/log/aim/security.log")

# 3. A2A (Agent-to-Agent) Protocol
from aim_sdk import A2AClient, A2AAgentCard

# Create an A2A client for agent-to-agent communication
a2a = A2AClient.from_env()

# Create an agent card (identity for A2A)
card = A2AAgentCard(
    agent_id=admin.agent_id,
    name="admin-agent",
    capabilities=["db:read", "api:call"]
)

# 4. Credential Management
from aim_sdk import (
    load_agent_credentials, save_agent_credentials,
    list_agent_credentials, delete_agent_credentials
)

# List all saved agent credentials
creds = list_agent_credentials()
for name, info in creds.items():
    print(f"Agent: {name}, Server: {info.get('aim_url', 'N/A')}")

# 5. Attestation Caching
from aim_sdk import AttestationCache

cache = AttestationCache()
# Caches verification results to reduce AIM server round-trips

SDK Features

Security

  • • Ed25519 signatures
  • • Automatic token refresh
  • • Secure credential storage
  • • SSL/TLS verification
  • • Rate limiting

Reliability

  • • Automatic retries
  • • Attestation caching
  • • Connection pooling
  • • Graceful degradation
  • • Credential persistence

Performance

  • • Auto-detection (type, capabilities, MCP)
  • • Connection reuse
  • • JIT access (just-in-time approval)
  • • Risk-level auto-detection
  • • SOC/SIEM security logging

Version Compatibility

SDK VersionPython VersionAIM API Version
1.0.x3.8+v1
0.9.x3.7+v1-beta
0.8.x3.7+v1-alpha