Build reliable AI agents on Bluestone PIM

This guide shows how to build AI agents and automation on top of the Bluestone PIM Management API. It combines the API's existing primitives - OAuth2, cursor pagination, bulk operations, async task polling, webhooks, a test environment, and machine-readable OpenAPI specs - into a small set of patterns you can compose into reliable workflows.

An agent here means any automated client that authenticates once and then reads, writes, and reacts to PIM data without a human in the loop: a sync job, an LLM tool-use loop, a data pipeline, or a scheduled enrichment task. The patterns below are written so the same automation behaves predictably whether it processes ten products or a hundred thousand.

This guide builds on the dedicated guides for each primitive. Read them when you need depth:

Patterns at a glance

PatternUse it forAPI featureReference
Authenticate once, cache tokenAvoid re-authenticating on every callOAuth2 client credentialsGenerate token
Discover schemaLearn attributes, contexts, catalogs, product types, and request shapes before reading or writingDefinitions, contexts, catalogs, OpenAPIFind all attribute definitions
Read large datasetsExport or scan many products without page driftCursor paginationGet products using cursor with views
Update many productsApply the same change to many products in fewer callsBulk operationsAdd attribute values in given set of products
Track long-running workWait for an async job without re-issuing itAsync task statusGet async task status
Survive transient failuresRecover from 429, 5xx, and network errorsRetry with backoff and jitterRate limits, retries, and backoff
Write safelyAvoid applying the same change twice on retryClient-side deduplicationThis guide
React to changesReplace polling with pushWebhooksWebhook events and delivery
Test before productionValidate automation against non-production dataTest environmentThis guide

Choose an environment: use the test environment first

Develop and validate every agent against the test environment before pointing it at production data. Test credentials are separate from production credentials, and a write in test cannot affect production.

See Environments for API and auth base URLs.

Each component is mounted on a path under the base URL, for example /pim, /metadata, /completeness-score, /notification-external, and /global-settings. The examples below use those prefixes.

Key endpoints and specs

Use caseMethod and pathReference
Generate OAuth2 bearer tokenPOST /op/tokenGenerate token
Read products with cursor pagination and viewsPOST /pim/products/cursor/views/allGet products using cursor with views
Add attribute values to many productsPOST /pim/products/attributes/add/by-idsAdd attribute values in given set of products
Poll PIM async task statusGET /pim/async/status/{taskId}Get async task status
Poll metadata async task statusGET /metadata/async/task/{taskId}Get task status
Discover attribute definitionsGET /pim/definitionsFind all attribute definitions
Discover catalogsGET /pim/catalogsList catalogs
Discover contextsGET /global-settings/contextGlobal settings OpenAPI
Validate completeness issuesGET /completeness-score/validations/{entityId}/{context}Get validation issues
Create a webhookPOST /notification-external/webhooksCreate webhook
Subscribe a webhook to eventsPUT /notification-external/subscriptions/webhook/{webhookId}/eventsSubscribe for given events for given webhook

OpenAPI specs are available as JSON and can be used by agents for planning, validation, and generated clients:

An LLM index is available at /llms.txt. AI tools can also connect to the Bluestone PIM docs MCP server to list specs, search endpoints, inspect request schemas, and run requests with your own bearer token.

A reusable client foundation

Most of the patterns share the same foundation: cache the token, retry transient failures, and switch environments with two constants. The examples in this guide use the Python client below. A Node/TypeScript version follows for teams building JavaScript agents.

Python

Set BLUESTONE_API_BASE and BLUESTONE_AUTH_BASE from the Environments page.

import random
import time
from typing import Any

import requests

# Choose an environment by setting these two constants. Start with Test.
BLUESTONE_AUTH_BASE = "https://idp.test.bluestonepim.com/op/token"
BLUESTONE_API_BASE = "https://api.test.bluestonepim.com"
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"

RETRY_STATUSES = {429, 500, 502, 503, 504}
MAX_RETRIES = 5
TIMEOUT_SECONDS = 30


class BluestoneClient:
    """Minimal Management API client for agents and automation."""

    def __init__(
        self,
        auth_url: str = BLUESTONE_AUTH_BASE,
        base_url: str = BLUESTONE_API_BASE,
        client_id: str = CLIENT_ID,
        client_secret: str = CLIENT_SECRET,
    ):
        self._auth_url = auth_url
        self._base_url = base_url.rstrip("/")
        self._client_id = client_id
        self._client_secret = client_secret
        self._token: str | None = None
        self._token_expiry = 0.0

    def _refresh_token(self) -> None:
        response = requests.post(
            self._auth_url,
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": self._client_id,
                "client_secret": self._client_secret,
            },
            timeout=TIMEOUT_SECONDS,
        )
        response.raise_for_status()
        payload = response.json()
        self._token = payload["access_token"]
        expires_in = int(payload.get("expires_in", 3600))
        self._token_expiry = time.monotonic() + max(expires_in - 60, 0)

    def _auth_header(self) -> str:
        if self._token is None or time.monotonic() >= self._token_expiry:
            self._refresh_token()
        return f"Bearer {self._token}"

    @staticmethod
    def _retry_delay(attempt: int) -> float:
        return min(2 ** attempt + random.uniform(0, 0.25), 30)

    def request(
        self,
        method: str,
        path: str,
        json_body: Any = None,
        params: dict[str, Any] | None = None,
    ) -> Any:
        url = f"{self._base_url}/{path.lstrip('/')}"
        refreshed_on_401 = False

        for attempt in range(MAX_RETRIES):
            headers = {
                "Authorization": self._auth_header(),
                "Content-Type": "application/json",
                "context-fallback": "true",
            }
            try:
                response = requests.request(
                    method,
                    url,
                    headers=headers,
                    json=json_body,
                    params=params,
                    timeout=TIMEOUT_SECONDS,
                )

                if response.status_code == 401 and not refreshed_on_401:
                    refreshed_on_401 = True
                    self._refresh_token()
                    continue

                if response.status_code not in RETRY_STATUSES:
                    response.raise_for_status()
                    return response.json() if response.content else None

                if attempt == MAX_RETRIES - 1:
                    response.raise_for_status()

                time.sleep(self._retry_delay(attempt))
            except (requests.ConnectionError, requests.Timeout):
                if attempt == MAX_RETRIES - 1:
                    raise
                time.sleep(self._retry_delay(attempt))

        raise RuntimeError("Request failed after retries")

Node/TypeScript

Set BLUESTONE_API_BASE and BLUESTONE_AUTH_BASE from the Environments page.

const BLUESTONE_AUTH_BASE = "https://idp.test.bluestonepim.com/op/token";
const BLUESTONE_API_BASE = "https://api.test.bluestonepim.com";
const CLIENT_ID = "YOUR_CLIENT_ID";
const CLIENT_SECRET = "YOUR_CLIENT_SECRET";

const RETRY_STATUSES = new Set([429, 500, 502, 503, 504]);
const MAX_RETRIES = 5;

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const retryDelayMs = (attempt: number) =>
  Math.min(2 ** attempt * 1000 + Math.floor(Math.random() * 250), 30000);

export class BluestoneClient {
  private token: string | null = null;
  private tokenExpiry = 0;

  private async refreshToken(): Promise<void> {
    const body = new URLSearchParams({
      grant_type: "client_credentials",
      client_id: CLIENT_ID,
      client_secret: CLIENT_SECRET,
    });
    const res = await fetch(BLUESTONE_AUTH_BASE, {
      method: "POST",
      headers: { "Content-Type": "application/x-www-form-urlencoded" },
      body,
    });
    if (!res.ok) throw new Error(`Token request failed: ${res.status}`);
    const json = await res.json();
    this.token = json.access_token;
    this.tokenExpiry =
      Date.now() + Math.max((json.expires_in ?? 3600) - 60, 0) * 1000;
  }

  private async authHeader(): Promise<string> {
    if (!this.token || Date.now() >= this.tokenExpiry) await this.refreshToken();
    return `Bearer ${this.token}`;
  }

  async request<T>(method: string, path: string, body?: unknown): Promise<T> {
    const url = `${BLUESTONE_API_BASE}/${path.replace(/^\/+/, "")}`;
    let refreshedOn401 = false;

    for (let attempt = 0; attempt < MAX_RETRIES; attempt += 1) {
      try {
        const res = await fetch(url, {
          method,
          headers: {
            Authorization: await this.authHeader(),
            "Content-Type": "application/json",
            "context-fallback": "true",
          },
          body: body === undefined ? undefined : JSON.stringify(body),
        });

        if (res.status === 401 && !refreshedOn401) {
          refreshedOn401 = true;
          await this.refreshToken();
          continue;
        }

        if (!RETRY_STATUSES.has(res.status)) {
          if (!res.ok) throw new Error(`API request failed: ${res.status}`);
          return res.status === 204 ? (undefined as T) : ((await res.json()) as T);
        }

        if (attempt === MAX_RETRIES - 1) {
          throw new Error(`Failed after retries: ${res.status}`);
        }
        await sleep(retryDelayMs(attempt));
      } catch (error) {
        if (!(error instanceof TypeError) || attempt === MAX_RETRIES - 1) {
          throw error;
        }
        await sleep(retryDelayMs(attempt));
      }
    }

    throw new Error("Request failed after retries");
  }
}

Pattern 1: Authenticate once, cache the token, refresh safely

The Management API uses OAuth2 client credentials. Request a token from the token URL for your environment, then reuse it until shortly before it expires. The token response includes expires_in in seconds; refresh a minute early so a long-running job never sends an expired token mid-flight.

Do not request a new token per API call. Re-authenticating on every request wastes the token endpoint and adds latency to every operation. The BluestoneClient above caches the token and refreshes it on demand, including a single forced refresh if the API unexpectedly returns 401.

Treat the client secret and access token as secrets. Use test credentials while developing, and never paste production credentials into a shared chat transcript or agent prompt. An agent can discover the token flow through the get-bearer-token spec on the docs MCP server.

Pattern 2: Discover the schema before reading or writing

Before an agent reads or writes product data, let it learn the shape of the catalog instead of hard-coding IDs. Bluestone PIM exposes the schema through dedicated endpoints and through machine-readable OpenAPI specs.

What to discoverEndpointReference
Attribute definitionsGET /pim/definitionsFind all attribute definitions
Catalogs and categoriesGET /pim/catalogsList catalogs
ContextsGET /global-settings/contextGlobal settings OpenAPI
Product metadatametadata on product viewsGet products using cursor with views
The full API surfaceOpenAPI JSON per componentGenerate SDKs and client code from OpenAPI
client = BluestoneClient()

# Attribute definitions: map attribute name to definition ID for later writes.
definitions = client.request("GET", "/pim/definitions")

# Contexts: use this before writing localized/context-specific values.
contexts = client.request("GET", "/global-settings/context")

# Catalogs: the category trees available to read and write against.
catalogs = client.request("GET", "/pim/catalogs")

For schema discovery from inside an editor or LLM tool-use loop, the docs MCP server exposes list-specs, list-endpoints, search-endpoints, and get-endpoint so the agent can find the right operation, parameters, and request body without a human pasting endpoint docs.

Pattern 3: Read large datasets with cursor pagination

Use cursor pagination to read large result sets without the page-number drift that happens when data changes between pages. Send count from 1 to 100 and the nextCursor from the previous response. Request only the views you need, for example {"type": "METADATA"} when you only need product metadata.

The response envelope is { "data": [...], "nextCursor": "..." }. Keep paging until data is empty or nextCursor is absent.

def scan_products(client: BluestoneClient, views=("METADATA",), count=100):
    """Yield every product, one page at a time, using cursor pagination."""
    cursor = None
    while True:
        body = {
            "count": count,
            "views": [{"type": view} for view in views],
        }
        if cursor:
            body["cursor"] = cursor

        page = client.request("POST", "/pim/products/cursor/views/all", json_body=body)
        data = page.get("data", [])
        for product in data:
            yield product

        cursor = page.get("nextCursor")
        if not cursor or not data:
            break

Cursor pagination also reduces rate-limit pressure: each call returns up to 100 products, so a full scan costs far fewer requests than fetching products one by one. See Rate limits, retries, and backoff for the request-pacing rules a scan should stay under.

Pattern 4: Update many products with bulk operations

When an agent applies the same change to many products, use a bulk endpoint instead of one request per product. For example, Add attribute values in given set of products sets one attribute definition's values on up to 100 product IDs in a single call and returns 204 No Content on success.

def bulk_add_attribute(client, attribute_definition_id, attribute_values, product_ids):
    """Add attribute values to up to 100 products per call. Returns nothing on 204."""
    for start in range(0, len(product_ids), 100):
        batch = product_ids[start:start + 100]
        client.request(
            "POST",
            "/pim/products/attributes/add/by-ids",
            json_body={
                "attributeDefinitionId": attribute_definition_id,
                "attributeValues": attribute_values,
                "productIds": batch,
            },
        )

Batch the work yourself when you have more than 100 IDs, as shown above. Bulk endpoints turn N writes into ceil(N / 100) requests, which is the biggest lever for staying inside the rate limit during a large automation run.

Pattern 5: Track long-running work with async task polling

Some operations are asynchronous: they accept the request, return quickly with a task ID, and do the work in the background. Poll the task status endpoint until the job reaches a terminal state instead of re-issuing the original operation.

  • PIM async tasks: GET /pim/async/status/{taskId} - Get async task status. The response includes fields such as status, count, and errorMessage; known statuses include WAITING and COMPLETED.
  • Metadata async tasks: GET /metadata/async/task/{taskId} - Get task status.

Poll with backoff. Start with a short delay and increase it while the task is still in progress; never poll in a tight loop.

def poll_job_status(
    client: BluestoneClient,
    task_id: str,
    poll_interval=2.0,
    max_interval=30.0,
    timeout=600.0,
) -> dict:
    """Poll a PIM async task until it completes, reports an error, or times out."""
    deadline = time.monotonic() + timeout
    delay = poll_interval

    while time.monotonic() < deadline:
        status = client.request("GET", f"/pim/async/status/{task_id}")
        state = status.get("status")
        if state == "COMPLETED":
            return status
        if status.get("errorMessage"):
            raise RuntimeError(f"Job {task_id} failed: {status['errorMessage']}")

        time.sleep(delay)
        delay = min(delay * 2, max_interval)

    raise TimeoutError(f"Job {task_id} did not complete within {timeout}s")

Use this helper for any operation that returns a task ID and 202 Accepted. Synchronous endpoints, such as the bulk attribute add in Pattern 4, return their result or 204 directly and need no polling.

Pattern 6: Handle 429, 5xx, and network errors

Any agent that runs long enough will hit a transient failure. Retry 429, transient 5xx (500, 502, 503, 504), and network errors with exponential backoff and jitter, as the BluestoneClient does. Do not retry 4xx errors other than 429; a 400, 401, 403, 404, or 409 will not succeed on retry and signals a bug, permissions issue, or data problem to fix.

Rate limits are applied per customer organization and shared across all of that organization's clients and tokens, so creating more OAuth clients does not raise your ceiling. Bluestone PIM returns 429 when you exceed the limit but does not return Retry-After or X-RateLimit-* headers for the per-second throttle. Use a fixed default backoff that starts around 1 second and grows with jitter. The full rules, including burst behavior, are in Rate limits, retries, and backoff.

Pattern 7: Idempotency and safe writes

Bluestone PIM does not currently expose server-side idempotency keys for write operations, so a retried write is not automatically de-duplicated by the server. Make your own writes safe to retry:

  • Prefer naturally idempotent operations. Setting an attribute to a known value, or upserting by a stable product number, produces the same result whether it runs once or twice. Favor these over operations whose effect compounds on repetition.
  • Track work with a deterministic client-side job ID. Derive an idempotency key from the logical unit of work, for example a hash of sync run + product ID + attribute, record it once the write succeeds, and skip it if you see it again.
  • Make bulk batches replayable. Because a bulk call can succeed server-side even if your process dies before reading the response, design batches so re-applying them is harmless, and key your dedup store on the batch rather than the whole job.
class IdempotentWriter:
    """Apply each logical write at most once, even across retries and restarts."""

    def __init__(self, client: BluestoneClient):
        self._client = client
        self._done: set[str] = set()  # Back this with a database for cross-run safety.

    def write_once(self, job_key: str, method: str, path: str, json_body) -> None:
        if job_key in self._done:
            return
        self._client.request(method, path, json_body=json_body)
        self._done.add(job_key)

This is recommended client behavior, not an implemented server-side idempotency feature. Keep that distinction clear in agent prompts, implementation notes, and operator-facing logs.

Pattern 8: Use webhooks instead of polling where possible

Polling for changes wastes requests and adds latency. Where a workflow reacts to changes, subscribe to webhooks so Bluestone PIM pushes events to you. Use the event as a signal that something changed, then read current state from the Management API for the affected entity.

Webhook delivery order is not guaranteed, and the payload is a change signal rather than a full snapshot. An agent should fetch the current entity state when order or completeness matters. Webhook messages are signed with HMAC-SHA256 in the x-bs-signature header; verify the signature against the raw request body before processing. See Webhook events and delivery for event types, payloads, retry behavior, and complete verified receiver examples.

Worked examples

The examples reuse BluestoneClient and the helpers above. Replace the credentials, IDs, and the BLUESTONE_AUTH_BASE and BLUESTONE_API_BASE constants for your environment. Keep them on the test environment until the workflow is proven.

Export products changed since last sync

Cursor pagination has no native "modified since" filter, but the METADATA view returns each product's lastUpdate timestamp as epoch milliseconds. Scan with the cursor, keep a watermark from the previous run, and emit only the products that changed since then. For a low-volume, near-real-time alternative, drive this from webhooks instead of scanning.

def export_changed_products(client: BluestoneClient, last_sync_ms: int):
    """Yield products whose metadata.lastUpdate is newer than the last sync watermark."""
    for product in scan_products(client, views=("METADATA",)):
        metadata = product.get("metadata") or {}
        last_update = metadata.get("lastUpdate")
        if last_update and int(last_update) > last_sync_ms:
            yield product


if __name__ == "__main__":
    client = BluestoneClient()
    last_sync_ms = 0  # Load the saved watermark; 0 means "full export".

    changed = list(export_changed_products(client, last_sync_ms))
    print(f"{len(changed)} products changed since {last_sync_ms}")
    for product in changed:
        meta = product.get("metadata", {})
        print(product.get("id"), meta.get("number"), meta.get("type"), meta.get("lastUpdate"))

    # Advance the watermark to the newest change seen, then persist it.
    new_watermark = max(
        (int((p.get("metadata") or {}).get("lastUpdate", 0)) for p in changed),
        default=last_sync_ms,
    )
    print(f"Next watermark: {new_watermark}")

Store the highest processed metadata.lastUpdate after a successful run. If two products share the same timestamp, keep a small overlap window or store processed product IDs for that timestamp so the next run can deduplicate safely.

Bulk update product attributes and poll job status

Apply an attribute value to many products with the bulk endpoint, then, for any follow-up operation that runs asynchronously, poll its job status until it completes. The bulk add itself is synchronous and returns 204; the polling helper is shown for asynchronous operations that return a task ID.

import hashlib
import json
import uuid


def request_fingerprint(payload: dict[str, Any]) -> str:
    canonical = json.dumps(payload, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(canonical.encode("utf-8")).hexdigest()


if __name__ == "__main__":
    client = BluestoneClient()

    attribute_definition_id = "YOUR_ATTRIBUTE_DEFINITION_ID"
    product_ids = ["PRODUCT_ID_1", "PRODUCT_ID_2"]
    attribute_values = ["AI reviewed"]

    payload = {
        "attributeDefinitionId": attribute_definition_id,
        "attributeValues": attribute_values,
        "productIds": product_ids,
    }
    client_job_id = str(uuid.uuid4())
    fingerprint = request_fingerprint(payload)

    print(json.dumps({
        "clientJobId": client_job_id,
        "fingerprint": fingerprint,
        "operation": "addAttributesByIds",
    }))

    bulk_add_attribute(
        client,
        attribute_definition_id=attribute_definition_id,
        attribute_values=attribute_values,
        product_ids=product_ids,
    )
    print("Bulk attribute update applied.")

    # If a later step starts an asynchronous job that returns a task ID,
    # poll it to completion instead of re-issuing the operation.
    task_id = "TASK_ID_FROM_AN_ASYNC_OPERATION"
    if task_id != "TASK_ID_FROM_AN_ASYNC_OPERATION":
        result = poll_job_status(client, task_id)
        print(f"Async job completed, {result.get('count')} items processed.")

Because server-side idempotency keys are not exposed, persist the clientJobId and request fingerprint before sending the write. If the process crashes, the agent can check its own job log before retrying.

Validate product data before writing

Bluestone PIM's completeness score exposes validation issues per product and context. Use it to validate before or after a write so an agent can catch data problems early instead of shipping invalid content.

def validate_product(client: BluestoneClient, product_id: str, context: str = "en") -> list:
    """Return validation issues for a product in a given context."""
    return client.request(
        "GET",
        f"/completeness-score/validations/{product_id}/{context}",
    )


if __name__ == "__main__":
    client = BluestoneClient()
    issues = validate_product(client, "YOUR_PRODUCT_ID", context="en")
    if issues:
        print("Validation issues found; do not publish:", issues)
    else:
        print("Product passes validation.")

See Get validation issues for product for the response shape. Validation does not replace schema discovery; use validation checks together with attribute definitions, allowed values, required contexts, and the OpenAPI request schema.

Subscribe to product changes

Create a webhook and subscribe it to the product events the agent cares about, so changes are pushed instead of polled. Set up the subscription once with cURL:

BLUESTONE_AUTH_BASE="https://idp.test.bluestonepim.com/op/token"
BLUESTONE_API_BASE="https://api.test.bluestonepim.com"
CLIENT_ID="YOUR_CLIENT_ID"
CLIENT_SECRET="YOUR_CLIENT_SECRET"
WEBHOOK_URL="https://example.ngrok-free.app/webhooks/bluestone"
WEBHOOK_SECRET="LongAndSecretPassword"

ACCESS_TOKEN=$(
  curl -sS -X POST "$BLUESTONE_AUTH_BASE" \
    -H "Content-Type: application/x-www-form-urlencoded" \
    --data-urlencode "grant_type=client_credentials" \
    --data-urlencode "client_id=$CLIENT_ID" \
    --data-urlencode "client_secret=$CLIENT_SECRET" | jq -r ".access_token"
)

# 1. Create the webhook. The response returns an id.
WEBHOOK_ID=$(
  curl -sS -X POST "$BLUESTONE_API_BASE/notification-external/webhooks" \
    -H "Authorization: Bearer $ACCESS_TOKEN" \
    -H "Content-Type: application/json" \
    -d "{\"active\": true, \"url\": \"$WEBHOOK_URL\", \"secret\": \"$WEBHOOK_SECRET\"}" | jq -r ".id"
)

# 2. Subscribe the webhook to product change events.
curl -sS -X PUT "$BLUESTONE_API_BASE/notification-external/subscriptions/webhook/$WEBHOOK_ID/events" \
  -H "Authorization: Bearer $ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "eventTypes": [
      "PRODUCT_CREATED",
      "PRODUCT_WATCH_ATTRIBUTE_UPDATE_VALUE",
      "PRODUCT_SYNC_DONE"
    ]
  }'

Then receive and verify the events. The Webhook events and delivery guide has complete signature-verifying receivers for Node/TypeScript with Express and Python with FastAPI. The key step is to verify the x-bs-signature HMAC-SHA256 header against the raw body, deduplicate processed events, and trigger a Management API read for each changed entityId.

Relevant references: Create webhook and Subscribe for given events for given webhook.

For AI agents

The Bluestone PIM API is built for automation. An agent can discover and drive it end to end:

  1. Discover the API with the docs MCP server (list-specs, search-endpoints, get-endpoint) or the published OpenAPI specs.
  2. Authenticate with OAuth2 client credentials and cache the token.
  3. Learn the schema - attribute definitions, contexts, catalogs, product types, and request schemas - before reading or writing.
  4. Read large datasets with cursor pagination and write at scale with bulk operations.
  5. Poll async task / job status for long-running work, and retry transient failures with backoff.
  6. Make writes safe to retry with client-side idempotency.
  7. React to changes with webhooks instead of polling.
  8. Do all of the above in the test environment before production.

Machine-readable OpenAPI specs are published per component, and an LLM index is available at /llms.txt.

Reliability checklist

  1. Token is cached and refreshed before expiry; it is not re-requested per call.
  2. Schema is discovered, not hard-coded: attribute definitions, contexts, catalogs, product metadata, and OpenAPI request schemas.
  3. Reads of large datasets use cursor pagination with count up to 100.
  4. Writes to many products use bulk operations, batched at 100 IDs.
  5. Asynchronous operations are tracked with async task / job status polling and backoff.
  6. 429, transient 5xx, and network errors are retried with exponential backoff and jitter; other 4xx errors are not.
  7. Writes are safe to retry through client-side idempotency, since server-side idempotency keys are not available.
  8. Change-driven workflows use webhooks with x-bs-signature verification instead of polling.
  9. Request payload hashes, product IDs, endpoint names, and agent run IDs are logged for audit and deduplication.
  10. The whole workflow is validated in the test environment before it runs against production.