Prefect Stripe Integration: Restricted API Keys, Spend Caps, and Agent Governance

Prefect's task and flow retry primitives are exactly what you want for resilient workflow orchestration. They are also the primitives that silently create duplicate Stripe charges when billing tasks raise exceptions at the wrong moment.

Prefect is a Python-native workflow orchestration framework built around two primitives: @task for individual units of work and @flow for orchestrating tasks into pipelines. Both support built-in retry semantics: @task(retries=3) re-runs the task function up to three times on failure; @flow(retries=1) re-runs the entire flow on failure. For AI agents that process payments — subscription billing, usage-based invoicing, batch charge runs — Prefect is a common orchestration layer. The retry semantics that make it reliable for database writes and API calls become dangerous when the API call is a Stripe charge.

This post covers three failure modes specific to Prefect's architecture — task-level retry, flow-level retry, and concurrent scheduled flow runs — with Prefect 3 Python SDK code for each, and the two-layer governance pattern that closes all three.

Failure mode 1: @task(retries=N) re-fires a completed Stripe charge

Prefect's @task decorator accepts a retries parameter that automatically re-runs the task function when it raises an exception. The retry runs the entire function body from the top. When a billing task calls stripe.charges.create() and then raises a downstream exception — a database write failure, a metadata update error, a network timeout writing to a secondary system — Prefect's retry runs the task again. The second run calls stripe.charges.create() again with the same parameters. Without an idempotency key, Stripe creates a second charge.

# billing_tasks.py — UNSAFE: no idempotency key, task-level retry
import stripe
from prefect import task, flow

stripe.api_key = "sk_live_..."  # Bare production key — full scope

@task(retries=3, retry_delay_seconds=2)
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> dict:
    # Stripe call succeeds — charge object created and returned
    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
        # No idempotency_key — each retry = new Stripe charge
    )

    # If this DB write raises, Prefect retries the whole task from line 1
    # stripe.charges.create() runs again → second charge, no dedup
    write_charge_record(customer_id, charge["id"], billing_period)

    return {"charge_id": charge["id"], "status": "succeeded"}

The failure sequence: stripe.charges.create() succeeds and returns charge ch_A. write_charge_record() raises a DatabaseTimeoutError. Prefect marks the task as failed and schedules retry 1. Retry 1 runs charge_customer from the top. stripe.charges.create() fires again with identical parameters. Stripe creates charge ch_B. The customer has been billed twice. If the database write fails again, Prefect schedules retry 2, creating charge ch_C. With retries=3, the customer can be charged four times before Prefect gives up.

The fix: a content-hash idempotency key derived from stable billing parameters, and separation of the database write into an independent task with its own retry policy:

# billing_tasks.py — SAFE: content-hash idempotency key + separated write
import hashlib
import stripe
from prefect import task, flow

PROXY_BASE = "https://proxy.keybrake.com/stripe"

def make_billing_client(vault_key: str) -> stripe.StripeClient:
    return stripe.StripeClient(
        api_key=vault_key,
        base_url=PROXY_BASE,
    )

def billing_idempotency_key(
    customer_id: str, amount_cents: int, billing_period: str
) -> str:
    raw = f"{customer_id}:{amount_cents}:{billing_period}:prefect-billing"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

@task(retries=3, retry_delay_seconds=2)
def charge_customer(
    customer_id: str,
    amount_cents: int,
    billing_period: str,
    vault_key: str,
) -> dict:
    client = make_billing_client(vault_key)
    idem_key = billing_idempotency_key(customer_id, amount_cents, billing_period)

    # Safe to retry — Stripe deduplicates on idempotency_key
    charge = client.charges.create(
        params={
            "amount": amount_cents,
            "currency": "usd",
            "customer": customer_id,
            "description": f"Subscription {billing_period}",
        },
        options={"idempotency_key": idem_key},
    )
    return {"charge_id": charge.id, "status": charge.status}

@task(retries=5, retry_delay_seconds=1)
def record_charge(customer_id: str, charge_id: str, billing_period: str) -> None:
    # Separate task — Stripe charge already safe, only DB write retries here
    write_charge_record(customer_id, charge_id, billing_period)

@flow
def billing_flow(customer_id: str, amount_cents: int, billing_period: str, vault_key: str):
    result = charge_customer(customer_id, amount_cents, billing_period, vault_key)
    record_charge(customer_id, result["charge_id"], billing_period)

The idempotency key is a SHA-256 hash of (customer_id, amount_cents, billing_period, 'prefect-billing'). This string is stable across all retries of the same billing operation. Stripe's idempotency key deduplication returns the original charge object on any retry within 24 hours. By separating the database write into its own record_charge task, its retry policy is independent — database write retries do not re-trigger the Stripe call.

Failure mode 2: @flow(retries=N) re-runs billing tasks without cache protection

Prefect's flow-level retry re-runs the entire flow function on failure. Tasks within the flow are re-executed unless they are cached or explicitly skipped. By default, Prefect tasks have no result cache — each flow retry re-runs every task from scratch. When an AI agent billing flow completes the Stripe charge but fails on a downstream step — sending a confirmation webhook, updating a CRM record, writing to an analytics pipeline — the flow-level retry re-runs the billing task.

# billing_flow.py — UNSAFE: flow retry re-runs charge_customer
from prefect import task, flow

@task  # No retries at task level — but see the flow decorator below
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> dict:
    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
        # No idempotency_key
    )
    return {"charge_id": charge["id"]}

@task
def send_confirmation_webhook(customer_id: str, charge_id: str) -> None:
    # Simulated webhook failure triggers flow retry
    post_to_crm(customer_id, charge_id)

@flow(retries=2, retry_delay_seconds=10)  # Flow retry re-runs all tasks
def monthly_billing_flow(customer_id: str, amount_cents: int, billing_period: str):
    result = charge_customer(customer_id, amount_cents, billing_period)
    send_confirmation_webhook(customer_id, result["charge_id"])
    # If send_confirmation_webhook fails, entire flow retries
    # charge_customer re-runs → new stripe.charges.create() call → duplicate charge

Prefect provides a cache_key_fn parameter on @task that lets you specify a function to compute a cache key from task inputs. When a task result is cached, subsequent flow retries skip the task and return the cached result. For billing tasks, the cache key should match the idempotency key — if a charge already succeeded for this (customer_id, amount_cents, billing_period) combination, return the cached result rather than re-calling Stripe:

# billing_flow.py — SAFE: cache_key_fn prevents billing re-execution on flow retry
from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib

def billing_cache_key(context, parameters) -> str:
    customer_id = parameters["customer_id"]
    amount_cents = parameters["amount_cents"]
    billing_period = parameters["billing_period"]
    raw = f"{customer_id}:{amount_cents}:{billing_period}:prefect-billing"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

@task(
    cache_key_fn=billing_cache_key,
    cache_expiration=timedelta(hours=25),  # Covers Stripe's 24h dedup window + margin
    retries=2,
    retry_delay_seconds=1,
)
def charge_customer(
    customer_id: str,
    amount_cents: int,
    billing_period: str,
    vault_key: str,
) -> dict:
    client = make_billing_client(vault_key)
    idem_key = billing_idempotency_key(customer_id, amount_cents, billing_period)

    charge = client.charges.create(
        params={
            "amount": amount_cents,
            "currency": "usd",
            "customer": customer_id,
            "description": f"Subscription {billing_period}",
        },
        options={"idempotency_key": idem_key},
    )
    return {"charge_id": charge.id, "status": charge.status}

@flow(retries=2, retry_delay_seconds=10)
def monthly_billing_flow(
    customer_id: str,
    amount_cents: int,
    billing_period: str,
    vault_key: str,
):
    # On flow retry: charge_customer sees matching cache key → returns cached result
    # Stripe is not called again
    result = charge_customer(customer_id, amount_cents, billing_period, vault_key)
    send_confirmation_webhook(customer_id, result["charge_id"])

The cache_key_fn computes the same SHA-256 hash used as the Stripe idempotency key. When the flow retries after send_confirmation_webhook fails, Prefect checks whether a cached result exists for charge_customer with the same inputs. If the prior run succeeded, the cached charge ID is returned and Stripe is not called. The idempotency key at the Stripe layer provides a second layer of protection for any scenario where the cache is not available — worker restart, cache backend failure, or a new Prefect deployment that clears prior run state.

Failure mode 3: Concurrent scheduled flow runs create simultaneous charges

Prefect deployments can run on a cron schedule. If the scheduled billing flow takes longer to complete than the scheduled interval — or if a manual run is triggered while a scheduled run is in progress — Prefect can execute two instances of the billing flow simultaneously. Both instances run independently with no cross-run deduplication. Both reach charge_customer. Both call stripe.charges.create() with identical parameters. Without an idempotency key, Stripe creates two charges. With an idempotency key but no concurrency limit, both flows submit identical Stripe requests within the 24-hour dedup window — Stripe deduplicates, but both flows believe they created the charge independently, potentially causing state inconsistency downstream.

# UNSAFE: no concurrency control, bare Stripe key, no idempotency key
from prefect import flow
from prefect.deployments import Deployment

@flow
def monthly_billing_flow(customer_id: str, amount_cents: int, billing_period: str):
    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
        # No idempotency_key
        # No concurrency limit on the deployment
        # Two simultaneous runs → two Stripe charges
    )

Prefect provides a concurrency limit mechanism via work pool concurrency and flow run concurrency limits. The safest pattern for billing flows is to set a concurrency_limit of 1 per customer, combined with idempotency keys and a pre-flight check that queries Stripe for an existing charge before creating one:

# SAFE: concurrency limit + pre-flight check + idempotency key
import stripe
from prefect import task, flow
from prefect.concurrency.sync import concurrency

@task
def check_existing_charge(
    customer_id: str,
    billing_period: str,
    audit_vault_key: str,
) -> str | None:
    """Return existing charge_id if one was already created this period."""
    audit_client = stripe.StripeClient(
        api_key=audit_vault_key,
        base_url=PROXY_BASE,
    )
    charges = audit_client.charges.list(params={
        "customer": customer_id,
        "limit": 10,
    })
    for charge in charges.data:
        meta_period = charge.metadata.get("billing_period", "")
        if meta_period == billing_period and charge.status == "succeeded":
            return charge.id
    return None

@task(retries=2, retry_delay_seconds=1)
def charge_customer(
    customer_id: str,
    amount_cents: int,
    billing_period: str,
    vault_key: str,
) -> dict:
    client = make_billing_client(vault_key)
    idem_key = billing_idempotency_key(customer_id, amount_cents, billing_period)

    charge = client.charges.create(
        params={
            "amount": amount_cents,
            "currency": "usd",
            "customer": customer_id,
            "description": f"Subscription {billing_period}",
            "metadata": {"billing_period": billing_period},
        },
        options={"idempotency_key": idem_key},
    )
    return {"charge_id": charge.id, "status": charge.status}

@flow
def monthly_billing_flow(
    customer_id: str,
    amount_cents: int,
    billing_period: str,
    billing_vault_key: str,
    audit_vault_key: str,
):
    # Acquire a per-customer concurrency slot — only one billing run at a time
    with concurrency(f"billing-{customer_id}", occupy=1):
        existing = check_existing_charge(customer_id, billing_period, audit_vault_key)
        if existing:
            return {"charge_id": existing, "status": "already_billed"}

        result = charge_customer(
            customer_id, amount_cents, billing_period, billing_vault_key
        )
    return result

The concurrency(f"billing-{customer_id}", occupy=1) context manager acquires a named concurrency slot scoped to the customer. If a second flow run for the same customer attempts to enter the block while the first is active, it waits. The check_existing_charge pre-flight task reads from Stripe using an audit-scoped vault key — one that allows only GET /v1/charges, not POST /v1/charges. If the customer was already billed this period, the flow exits without charging. The idempotency key provides a final safety net if concurrency control is bypassed.

Governance comparison

Scenario Bare Stripe key Restricted key only Idempotency key only Vault key + idempotency + concurrency
@task retry on downstream failure Duplicate charge Duplicate charge Safe — Stripe deduplicates Safe — Stripe deduplicates; vault key limits scope
@flow retry re-runs billing task Duplicate charge Duplicate charge Safe if cache_key_fn used; unsafe otherwise Safe — cache_key_fn + idempotency + proxy enforces
Concurrent scheduled runs Two charges, possible state split Two charges, possible state split Stripe deduplicates; downstream split remains Safe — concurrency limit serializes runs per customer
Runaway loop (high-frequency schedule) Charges accumulate up to rate limit Charges accumulate Deduplication within 24h only Proxy daily cap stops accumulation at configured limit
Key compromise Full Stripe account access Limited to Stripe permissions Full access still possible Vault key: single endpoint, daily USD cap, instant revoke

Putting it together: a safe Prefect billing flow

The complete pattern integrates all three fixes: content-hash idempotency keys, task-level caching keyed to the same hash, per-customer concurrency limits, pre-flight charge lookup, and vault keys routed through a spend-cap proxy:

# safe_billing.py — complete Prefect Stripe governance pattern
import hashlib
import stripe
from prefect import task, flow
from prefect.concurrency.sync import concurrency
from datetime import timedelta

PROXY_BASE = "https://proxy.keybrake.com/stripe"

def make_client(vault_key: str) -> stripe.StripeClient:
    return stripe.StripeClient(api_key=vault_key, base_url=PROXY_BASE)

def billing_idem_key(customer_id: str, amount_cents: int, period: str) -> str:
    raw = f"{customer_id}:{amount_cents}:{period}:prefect-billing"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

def billing_cache_key(context, parameters) -> str:
    p = parameters
    return billing_idem_key(p["customer_id"], p["amount_cents"], p["billing_period"])

@task
def check_existing_charge(
    customer_id: str, period: str, audit_key: str
) -> str | None:
    client = make_client(audit_key)
    charges = client.charges.list(params={"customer": customer_id, "limit": 10})
    for ch in charges.data:
        if ch.metadata.get("billing_period") == period and ch.status == "succeeded":
            return ch.id
    return None

@task(
    cache_key_fn=billing_cache_key,
    cache_expiration=timedelta(hours=25),
    retries=2,
    retry_delay_seconds=2,
)
def create_charge(
    customer_id: str, amount_cents: int, billing_period: str, billing_key: str
) -> dict:
    client = make_client(billing_key)
    idem_key = billing_idem_key(customer_id, amount_cents, billing_period)

    charge = client.charges.create(
        params={
            "amount": amount_cents,
            "currency": "usd",
            "customer": customer_id,
            "description": f"Subscription {billing_period}",
            "metadata": {"billing_period": billing_period},
        },
        options={"idempotency_key": idem_key},
    )
    return {"charge_id": charge.id, "status": charge.status}

@flow(retries=1, retry_delay_seconds=30)
def billing_flow(
    customer_id: str,
    amount_cents: int,
    billing_period: str,
    billing_key: str,
    audit_key: str,
) -> dict:
    with concurrency(f"billing-{customer_id}", occupy=1):
        existing = check_existing_charge(customer_id, billing_period, audit_key)
        if existing:
            return {"charge_id": existing, "status": "already_billed"}

        return create_charge(customer_id, amount_cents, billing_period, billing_key)

The billing_key is a vault key scoped to POST /v1/charges with a daily USD cap set to the expected maximum charge for a single customer. The audit_key is a separate vault key scoped to GET /v1/charges only — it cannot create charges even if compromised. Both keys are revocable from the Keybrake dashboard without touching the Prefect deployment configuration.

Gap analysis

Prefect task map() and batch billing

Prefect's .map() submits a task once per element in an iterable, executing all mapped tasks concurrently by default. A billing_flow that maps over a list of 500 customers and calls charge_customer for each fires all 500 charges in parallel. If the flow fails after 300 charges complete, a flow retry re-runs all 500 charge_customer tasks — the cache protects customers whose tasks succeeded, but any customers whose tasks raised mid-charge (after Stripe created the charge but before the result was cached) get re-charged. The idempotency key provides the final guard here; cache_key_fn alone is not sufficient because the cache entry is only written on task success.

Prefect artifacts and result storage

Prefect stores task results in a configured result storage backend (local filesystem, S3, GCS, Azure Blob). When create_charge returns a dict containing {"charge_id": "ch_abc123", "status": "succeeded"}, Prefect serializes and persists this result. Depending on your result storage configuration, this includes the charge ID and any metadata you return. If you return the full Stripe charge object (including customer email, amount, description), that data is persisted in your result backend. Limit what you return from billing tasks to the minimum required downstream — charge ID and status — and configure result storage with appropriate access controls.

Work pool concurrency vs flow run concurrency

Prefect's concurrency() context manager uses Prefect's global concurrency API, which requires the Prefect server or Prefect Cloud to be reachable at runtime. In self-hosted deployments where the Prefect server is unavailable (network partition, server restart), concurrency() may raise a ConcurrencySlotAcquisitionError. Configure a fallback — either fail fast and do not charge, or use an external distributed lock (Redis SETNX) as a belt-and-suspenders guard alongside Prefect's concurrency API. The idempotency key at the Stripe layer remains the last line of defense regardless of concurrency control failures.

Prefect deployments triggered by event-driven infrastructure

Prefect deployments can be triggered by webhooks, event sources, and external automation. An event-driven deployment that fires on a payment.due event from an upstream system can receive duplicate events — most message queues deliver at-least-once. A Prefect deployment triggered twice by the same event fires two flow runs with identical parameters. The idempotency key handles this at the Stripe layer, but the concurrency limit and pre-flight check prevent both flows from proceeding past the billing step simultaneously, which avoids the state split that idempotency alone cannot resolve.

Enforcement tests

# test_billing_governance.py
import pytest
from unittest.mock import patch, MagicMock
from billing_flow import billing_idem_key, check_existing_charge, create_charge

def test_idempotency_key_is_stable_across_retries():
    k1 = billing_idem_key("cus_abc", 2999, "2026-06")
    k2 = billing_idem_key("cus_abc", 2999, "2026-06")
    assert k1 == k2, "Idempotency key must be deterministic for same inputs"

def test_idempotency_key_differs_across_periods():
    k1 = billing_idem_key("cus_abc", 2999, "2026-06")
    k2 = billing_idem_key("cus_abc", 2999, "2026-07")
    assert k1 != k2, "Different billing periods must produce different keys"

def test_check_existing_charge_returns_id_when_found():
    mock_client = MagicMock()
    mock_charge = MagicMock(
        id="ch_existing",
        status="succeeded",
        metadata={"billing_period": "2026-06"},
    )
    mock_client.charges.list.return_value = MagicMock(data=[mock_charge])

    with patch("billing_flow.make_client", return_value=mock_client):
        result = check_existing_charge.fn("cus_abc", "2026-06", "vault_audit_xxx")

    assert result == "ch_existing"

def test_check_existing_charge_returns_none_when_not_found():
    mock_client = MagicMock()
    mock_client.charges.list.return_value = MagicMock(data=[])

    with patch("billing_flow.make_client", return_value=mock_client):
        result = check_existing_charge.fn("cus_abc", "2026-06", "vault_audit_xxx")

    assert result is None

def test_create_charge_passes_idempotency_key_to_stripe():
    mock_client = MagicMock()
    mock_client.charges.create.return_value = MagicMock(id="ch_new", status="succeeded")

    with patch("billing_flow.make_client", return_value=mock_client):
        result = create_charge.fn("cus_abc", 2999, "2026-06", "vault_billing_xxx")

    call_kwargs = mock_client.charges.create.call_args
    options = call_kwargs.kwargs.get("options") or call_kwargs[1].get("options", {})
    assert "idempotency_key" in options, "Stripe call must include idempotency_key"
    assert result["charge_id"] == "ch_new"

FAQ

Can I use Prefect's built-in result caching as the sole protection against duplicate charges?

No. Prefect's result cache is written on task success and uses your configured result storage backend. The cache entry does not exist if the task raised an exception — even if the Stripe charge completed successfully before the exception. A task retry after such a failure would re-call Stripe without finding a cache entry. The idempotency key at the Stripe API layer is the correct protection for retry scenarios; the cache protects against flow-level retries after a fully successful task.

Does Stripe's 24-hour idempotency window cover all Prefect retry scenarios?

For task and flow retries, yes — Prefect retries happen within seconds to minutes of the original failure, well within the 24-hour window. For flow runs triggered by a rescheduled deployment that missed its cron window and backfilled, multiple flow runs may fire in quick succession for adjacent billing periods. Ensure your idempotency key includes the billing period to prevent cross-period deduplication collisions.

What happens if two Prefect workers process the same billing task simultaneously?

Prefect work pools can run multiple concurrent workers. If two workers pick up the same flow run (which should not happen in a correctly configured deployment, but can occur during work pool configuration changes or worker restarts), both workers would execute create_charge. The per-customer concurrency limit prevents both from acquiring the billing slot simultaneously. If both somehow acquired the slot (race condition at the Prefect server level), the idempotency key at Stripe ensures only one charge is created.

Does the vault key approach require changing my existing Stripe integration code?

The change is one line: replace stripe.api_key = "sk_live_..." with stripe.StripeClient(api_key=vault_key, base_url="https://proxy.keybrake.com/stripe"). All existing Stripe API calls work identically. The vault key is issued per-agent-run from the Keybrake dashboard with a policy specifying allowed endpoints and a daily USD cap. The proxy forwards all calls to Stripe and enforces the policy in the forwarding layer.

How do I handle the case where check_existing_charge itself fails?

The check_existing_charge task uses the audit vault key scoped to GET /v1/charges. If the Stripe API is unreachable, the task should raise and the flow should fail without proceeding to billing. This is safer than failing open. Configure @task(retries=2) on check_existing_charge to tolerate transient network errors, but do not configure a fallback that skips the pre-flight check — a billing flow with no pre-flight check is equivalent to not having the check at all.

What is the right daily USD cap to set on the billing vault key?

Set the cap to the maximum expected charge for a single customer in one billing cycle, not the total for all customers. Each flow run should use a vault key issued per-customer-per-billing-period, so the cap applies per run. A $99/month subscription flow should use a vault key capped at $110 (10% buffer for currency fluctuation or tax). If a runaway loop fires multiple times for the same customer, the proxy blocks all charges after the first $110 is reached. For batch billing flows that process multiple customers in one run, issue one vault key per customer with individual caps rather than one shared key for the entire batch.

Vault keys for your Prefect billing flows

Keybrake issues vault keys scoped to specific Stripe endpoints with daily USD caps and per-run revocation. One line changes your Prefect billing flow from a bare sk_live_ key to a governed proxy that blocks runaway retries at the infrastructure layer.