Build reliable AI agents on Bluestone PIM
This guide shows how to build AI agents and automation on top of the Bluestone PIM Management API. It combines the API's existing primitives - OAuth2, cursor pagination, bulk operations, async task polling, webhooks, a test environment, and machine-readable OpenAPI specs - into a small set of patterns you can compose into reliable workflows.
An agent here means any automated client that authenticates once and then reads, writes, and reacts to PIM data without a human in the loop: a sync job, an LLM tool-use loop, a data pipeline, or a scheduled enrichment task. The patterns below are written so the same automation behaves predictably whether it processes ten products or a hundred thousand.
This guide builds on the dedicated guides for each primitive. Read them when you need depth:
- Generate SDKs and client code from OpenAPI - typed clients and the OpenAPI specs.
- Rate limits, retries, and backoff -
429handling, backoff, and burst behavior. - Webhook events and delivery - event types, payloads, delivery, and signature verification.
- Use the Bluestone PIM docs MCP server - let an agent discover the API directly.
Patterns at a glance
| Pattern | Use it for | API feature | Reference |
|---|---|---|---|
| Authenticate once, cache token | Avoid re-authenticating on every call | OAuth2 client credentials | Generate token |
| Discover schema | Learn attributes, contexts, catalogs, product types, and request shapes before reading or writing | Definitions, contexts, catalogs, OpenAPI | Find all attribute definitions |
| Read large datasets | Export or scan many products without page drift | Cursor pagination | Get products using cursor with views |
| Update many products | Apply the same change to many products in fewer calls | Bulk operations | Add attribute values in given set of products |
| Track long-running work | Wait for an async job without re-issuing it | Async task status | Get async task status |
| Survive transient failures | Recover from 429, 5xx, and network errors | Retry with backoff and jitter | Rate limits, retries, and backoff |
| Write safely | Avoid applying the same change twice on retry | Client-side deduplication | This guide |
| React to changes | Replace polling with push | Webhooks | Webhook events and delivery |
| Test before production | Validate automation against non-production data | Test environment | This guide |
Choose an environment: use the test environment first
Develop and validate every agent against the test environment before pointing it at production data. Test credentials are separate from production credentials, and a write in test cannot affect production.
See Environments for API and auth base URLs.
Each component is mounted on a path under the base URL, for example /pim, /metadata, /completeness-score, /notification-external, and /global-settings. The examples below use those prefixes.
Key endpoints and specs
| Use case | Method and path | Reference |
|---|---|---|
| Generate OAuth2 bearer token | POST /op/token | Generate token |
| Read products with cursor pagination and views | POST /pim/products/cursor/views/all | Get products using cursor with views |
| Add attribute values to many products | POST /pim/products/attributes/add/by-ids | Add attribute values in given set of products |
| Poll PIM async task status | GET /pim/async/status/{taskId} | Get async task status |
| Poll metadata async task status | GET /metadata/async/task/{taskId} | Get task status |
| Discover attribute definitions | GET /pim/definitions | Find all attribute definitions |
| Discover catalogs | GET /pim/catalogs | List catalogs |
| Discover contexts | GET /global-settings/context | Global settings OpenAPI |
| Validate completeness issues | GET /completeness-score/validations/{entityId}/{context} | Get validation issues |
| Create a webhook | POST /notification-external/webhooks | Create webhook |
| Subscribe a webhook to events | PUT /notification-external/subscriptions/webhook/{webhookId}/events | Subscribe for given events for given webhook |
OpenAPI specs are available as JSON and can be used by agents for planning, validation, and generated clients:
- PIM OpenAPI
- Metadata OpenAPI
- Completeness score OpenAPI
- External notifications OpenAPI
- Global settings OpenAPI
An LLM index is available at /llms.txt. AI tools can also connect to the Bluestone PIM docs MCP server to list specs, search endpoints, inspect request schemas, and run requests with your own bearer token.
A reusable client foundation
Most of the patterns share the same foundation: cache the token, retry transient failures, and switch environments with two constants. The examples in this guide use the Python client below. A Node/TypeScript version follows for teams building JavaScript agents.
Python
Set BLUESTONE_API_BASE and BLUESTONE_AUTH_BASE from the Environments page.
import random
import time
from typing import Any
import requests
# Choose an environment by setting these two constants. Start with Test.
BLUESTONE_AUTH_BASE = "https://idp.test.bluestonepim.com/op/token"
BLUESTONE_API_BASE = "https://api.test.bluestonepim.com"
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
RETRY_STATUSES = {429, 500, 502, 503, 504}
MAX_RETRIES = 5
TIMEOUT_SECONDS = 30
class BluestoneClient:
"""Minimal Management API client for agents and automation."""
def __init__(
self,
auth_url: str = BLUESTONE_AUTH_BASE,
base_url: str = BLUESTONE_API_BASE,
client_id: str = CLIENT_ID,
client_secret: str = CLIENT_SECRET,
):
self._auth_url = auth_url
self._base_url = base_url.rstrip("/")
self._client_id = client_id
self._client_secret = client_secret
self._token: str | None = None
self._token_expiry = 0.0
def _refresh_token(self) -> None:
response = requests.post(
self._auth_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "client_credentials",
"client_id": self._client_id,
"client_secret": self._client_secret,
},
timeout=TIMEOUT_SECONDS,
)
response.raise_for_status()
payload = response.json()
self._token = payload["access_token"]
expires_in = int(payload.get("expires_in", 3600))
self._token_expiry = time.monotonic() + max(expires_in - 60, 0)
def _auth_header(self) -> str:
if self._token is None or time.monotonic() >= self._token_expiry:
self._refresh_token()
return f"Bearer {self._token}"
@staticmethod
def _retry_delay(attempt: int) -> float:
return min(2 ** attempt + random.uniform(0, 0.25), 30)
def request(
self,
method: str,
path: str,
json_body: Any = None,
params: dict[str, Any] | None = None,
) -> Any:
url = f"{self._base_url}/{path.lstrip('/')}"
refreshed_on_401 = False
for attempt in range(MAX_RETRIES):
headers = {
"Authorization": self._auth_header(),
"Content-Type": "application/json",
"context-fallback": "true",
}
try:
response = requests.request(
method,
url,
headers=headers,
json=json_body,
params=params,
timeout=TIMEOUT_SECONDS,
)
if response.status_code == 401 and not refreshed_on_401:
refreshed_on_401 = True
self._refresh_token()
continue
if response.status_code not in RETRY_STATUSES:
response.raise_for_status()
return response.json() if response.content else None
if attempt == MAX_RETRIES - 1:
response.raise_for_status()
time.sleep(self._retry_delay(attempt))
except (requests.ConnectionError, requests.Timeout):
if attempt == MAX_RETRIES - 1:
raise
time.sleep(self._retry_delay(attempt))
raise RuntimeError("Request failed after retries")Node/TypeScript
Set BLUESTONE_API_BASE and BLUESTONE_AUTH_BASE from the Environments page.
const BLUESTONE_AUTH_BASE = "https://idp.test.bluestonepim.com/op/token";
const BLUESTONE_API_BASE = "https://api.test.bluestonepim.com";
const CLIENT_ID = "YOUR_CLIENT_ID";
const CLIENT_SECRET = "YOUR_CLIENT_SECRET";
const RETRY_STATUSES = new Set([429, 500, 502, 503, 504]);
const MAX_RETRIES = 5;
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const retryDelayMs = (attempt: number) =>
Math.min(2 ** attempt * 1000 + Math.floor(Math.random() * 250), 30000);
export class BluestoneClient {
private token: string | null = null;
private tokenExpiry = 0;
private async refreshToken(): Promise<void> {
const body = new URLSearchParams({
grant_type: "client_credentials",
client_id: CLIENT_ID,
client_secret: CLIENT_SECRET,
});
const res = await fetch(BLUESTONE_AUTH_BASE, {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body,
});
if (!res.ok) throw new Error(`Token request failed: ${res.status}`);
const json = await res.json();
this.token = json.access_token;
this.tokenExpiry =
Date.now() + Math.max((json.expires_in ?? 3600) - 60, 0) * 1000;
}
private async authHeader(): Promise<string> {
if (!this.token || Date.now() >= this.tokenExpiry) await this.refreshToken();
return `Bearer ${this.token}`;
}
async request<T>(method: string, path: string, body?: unknown): Promise<T> {
const url = `${BLUESTONE_API_BASE}/${path.replace(/^\/+/, "")}`;
let refreshedOn401 = false;
for (let attempt = 0; attempt < MAX_RETRIES; attempt += 1) {
try {
const res = await fetch(url, {
method,
headers: {
Authorization: await this.authHeader(),
"Content-Type": "application/json",
"context-fallback": "true",
},
body: body === undefined ? undefined : JSON.stringify(body),
});
if (res.status === 401 && !refreshedOn401) {
refreshedOn401 = true;
await this.refreshToken();
continue;
}
if (!RETRY_STATUSES.has(res.status)) {
if (!res.ok) throw new Error(`API request failed: ${res.status}`);
return res.status === 204 ? (undefined as T) : ((await res.json()) as T);
}
if (attempt === MAX_RETRIES - 1) {
throw new Error(`Failed after retries: ${res.status}`);
}
await sleep(retryDelayMs(attempt));
} catch (error) {
if (!(error instanceof TypeError) || attempt === MAX_RETRIES - 1) {
throw error;
}
await sleep(retryDelayMs(attempt));
}
}
throw new Error("Request failed after retries");
}
}Pattern 1: Authenticate once, cache the token, refresh safely
The Management API uses OAuth2 client credentials. Request a token from the token URL for your environment, then reuse it until shortly before it expires. The token response includes expires_in in seconds; refresh a minute early so a long-running job never sends an expired token mid-flight.
Do not request a new token per API call. Re-authenticating on every request wastes the token endpoint and adds latency to every operation. The BluestoneClient above caches the token and refreshes it on demand, including a single forced refresh if the API unexpectedly returns 401.
Treat the client secret and access token as secrets. Use test credentials while developing, and never paste production credentials into a shared chat transcript or agent prompt. An agent can discover the token flow through the get-bearer-token spec on the docs MCP server.
Pattern 2: Discover the schema before reading or writing
Before an agent reads or writes product data, let it learn the shape of the catalog instead of hard-coding IDs. Bluestone PIM exposes the schema through dedicated endpoints and through machine-readable OpenAPI specs.
| What to discover | Endpoint | Reference |
|---|---|---|
| Attribute definitions | GET /pim/definitions | Find all attribute definitions |
| Catalogs and categories | GET /pim/catalogs | List catalogs |
| Contexts | GET /global-settings/context | Global settings OpenAPI |
| Product metadata | metadata on product views | Get products using cursor with views |
| The full API surface | OpenAPI JSON per component | Generate SDKs and client code from OpenAPI |
client = BluestoneClient()
# Attribute definitions: map attribute name to definition ID for later writes.
definitions = client.request("GET", "/pim/definitions")
# Contexts: use this before writing localized/context-specific values.
contexts = client.request("GET", "/global-settings/context")
# Catalogs: the category trees available to read and write against.
catalogs = client.request("GET", "/pim/catalogs")For schema discovery from inside an editor or LLM tool-use loop, the docs MCP server exposes list-specs, list-endpoints, search-endpoints, and get-endpoint so the agent can find the right operation, parameters, and request body without a human pasting endpoint docs.
Pattern 3: Read large datasets with cursor pagination
Use cursor pagination to read large result sets without the page-number drift that happens when data changes between pages. Send count from 1 to 100 and the nextCursor from the previous response. Request only the views you need, for example {"type": "METADATA"} when you only need product metadata.
The response envelope is { "data": [...], "nextCursor": "..." }. Keep paging until data is empty or nextCursor is absent.
def scan_products(client: BluestoneClient, views=("METADATA",), count=100):
"""Yield every product, one page at a time, using cursor pagination."""
cursor = None
while True:
body = {
"count": count,
"views": [{"type": view} for view in views],
}
if cursor:
body["cursor"] = cursor
page = client.request("POST", "/pim/products/cursor/views/all", json_body=body)
data = page.get("data", [])
for product in data:
yield product
cursor = page.get("nextCursor")
if not cursor or not data:
breakCursor pagination also reduces rate-limit pressure: each call returns up to 100 products, so a full scan costs far fewer requests than fetching products one by one. See Rate limits, retries, and backoff for the request-pacing rules a scan should stay under.
Pattern 4: Update many products with bulk operations
When an agent applies the same change to many products, use a bulk endpoint instead of one request per product. For example, Add attribute values in given set of products sets one attribute definition's values on up to 100 product IDs in a single call and returns 204 No Content on success.
def bulk_add_attribute(client, attribute_definition_id, attribute_values, product_ids):
"""Add attribute values to up to 100 products per call. Returns nothing on 204."""
for start in range(0, len(product_ids), 100):
batch = product_ids[start:start + 100]
client.request(
"POST",
"/pim/products/attributes/add/by-ids",
json_body={
"attributeDefinitionId": attribute_definition_id,
"attributeValues": attribute_values,
"productIds": batch,
},
)Batch the work yourself when you have more than 100 IDs, as shown above. Bulk endpoints turn N writes into ceil(N / 100) requests, which is the biggest lever for staying inside the rate limit during a large automation run.
Pattern 5: Track long-running work with async task polling
Some operations are asynchronous: they accept the request, return quickly with a task ID, and do the work in the background. Poll the task status endpoint until the job reaches a terminal state instead of re-issuing the original operation.
- PIM async tasks:
GET /pim/async/status/{taskId}- Get async task status. The response includes fields such asstatus,count, anderrorMessage; known statuses includeWAITINGandCOMPLETED. - Metadata async tasks:
GET /metadata/async/task/{taskId}- Get task status.
Poll with backoff. Start with a short delay and increase it while the task is still in progress; never poll in a tight loop.
def poll_job_status(
client: BluestoneClient,
task_id: str,
poll_interval=2.0,
max_interval=30.0,
timeout=600.0,
) -> dict:
"""Poll a PIM async task until it completes, reports an error, or times out."""
deadline = time.monotonic() + timeout
delay = poll_interval
while time.monotonic() < deadline:
status = client.request("GET", f"/pim/async/status/{task_id}")
state = status.get("status")
if state == "COMPLETED":
return status
if status.get("errorMessage"):
raise RuntimeError(f"Job {task_id} failed: {status['errorMessage']}")
time.sleep(delay)
delay = min(delay * 2, max_interval)
raise TimeoutError(f"Job {task_id} did not complete within {timeout}s")Use this helper for any operation that returns a task ID and 202 Accepted. Synchronous endpoints, such as the bulk attribute add in Pattern 4, return their result or 204 directly and need no polling.
Pattern 6: Handle 429, 5xx, and network errors
429, 5xx, and network errorsAny agent that runs long enough will hit a transient failure. Retry 429, transient 5xx (500, 502, 503, 504), and network errors with exponential backoff and jitter, as the BluestoneClient does. Do not retry 4xx errors other than 429; a 400, 401, 403, 404, or 409 will not succeed on retry and signals a bug, permissions issue, or data problem to fix.
Rate limits are applied per customer organization and shared across all of that organization's clients and tokens, so creating more OAuth clients does not raise your ceiling. Bluestone PIM returns 429 when you exceed the limit but does not return Retry-After or X-RateLimit-* headers for the per-second throttle. Use a fixed default backoff that starts around 1 second and grows with jitter. The full rules, including burst behavior, are in Rate limits, retries, and backoff.
Pattern 7: Idempotency and safe writes
Bluestone PIM does not currently expose server-side idempotency keys for write operations, so a retried write is not automatically de-duplicated by the server. Make your own writes safe to retry:
- Prefer naturally idempotent operations. Setting an attribute to a known value, or upserting by a stable product number, produces the same result whether it runs once or twice. Favor these over operations whose effect compounds on repetition.
- Track work with a deterministic client-side job ID. Derive an idempotency key from the logical unit of work, for example a hash of
sync run + product ID + attribute, record it once the write succeeds, and skip it if you see it again. - Make bulk batches replayable. Because a bulk call can succeed server-side even if your process dies before reading the response, design batches so re-applying them is harmless, and key your dedup store on the batch rather than the whole job.
class IdempotentWriter:
"""Apply each logical write at most once, even across retries and restarts."""
def __init__(self, client: BluestoneClient):
self._client = client
self._done: set[str] = set() # Back this with a database for cross-run safety.
def write_once(self, job_key: str, method: str, path: str, json_body) -> None:
if job_key in self._done:
return
self._client.request(method, path, json_body=json_body)
self._done.add(job_key)This is recommended client behavior, not an implemented server-side idempotency feature. Keep that distinction clear in agent prompts, implementation notes, and operator-facing logs.
Pattern 8: Use webhooks instead of polling where possible
Polling for changes wastes requests and adds latency. Where a workflow reacts to changes, subscribe to webhooks so Bluestone PIM pushes events to you. Use the event as a signal that something changed, then read current state from the Management API for the affected entity.
Webhook delivery order is not guaranteed, and the payload is a change signal rather than a full snapshot. An agent should fetch the current entity state when order or completeness matters. Webhook messages are signed with HMAC-SHA256 in the x-bs-signature header; verify the signature against the raw request body before processing. See Webhook events and delivery for event types, payloads, retry behavior, and complete verified receiver examples.
Worked examples
The examples reuse BluestoneClient and the helpers above. Replace the credentials, IDs, and the BLUESTONE_AUTH_BASE and BLUESTONE_API_BASE constants for your environment. Keep them on the test environment until the workflow is proven.
Export products changed since last sync
Cursor pagination has no native "modified since" filter, but the METADATA view returns each product's lastUpdate timestamp as epoch milliseconds. Scan with the cursor, keep a watermark from the previous run, and emit only the products that changed since then. For a low-volume, near-real-time alternative, drive this from webhooks instead of scanning.
def export_changed_products(client: BluestoneClient, last_sync_ms: int):
"""Yield products whose metadata.lastUpdate is newer than the last sync watermark."""
for product in scan_products(client, views=("METADATA",)):
metadata = product.get("metadata") or {}
last_update = metadata.get("lastUpdate")
if last_update and int(last_update) > last_sync_ms:
yield product
if __name__ == "__main__":
client = BluestoneClient()
last_sync_ms = 0 # Load the saved watermark; 0 means "full export".
changed = list(export_changed_products(client, last_sync_ms))
print(f"{len(changed)} products changed since {last_sync_ms}")
for product in changed:
meta = product.get("metadata", {})
print(product.get("id"), meta.get("number"), meta.get("type"), meta.get("lastUpdate"))
# Advance the watermark to the newest change seen, then persist it.
new_watermark = max(
(int((p.get("metadata") or {}).get("lastUpdate", 0)) for p in changed),
default=last_sync_ms,
)
print(f"Next watermark: {new_watermark}")Store the highest processed metadata.lastUpdate after a successful run. If two products share the same timestamp, keep a small overlap window or store processed product IDs for that timestamp so the next run can deduplicate safely.
Bulk update product attributes and poll job status
Apply an attribute value to many products with the bulk endpoint, then, for any follow-up operation that runs asynchronously, poll its job status until it completes. The bulk add itself is synchronous and returns 204; the polling helper is shown for asynchronous operations that return a task ID.
import hashlib
import json
import uuid
def request_fingerprint(payload: dict[str, Any]) -> str:
canonical = json.dumps(payload, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
if __name__ == "__main__":
client = BluestoneClient()
attribute_definition_id = "YOUR_ATTRIBUTE_DEFINITION_ID"
product_ids = ["PRODUCT_ID_1", "PRODUCT_ID_2"]
attribute_values = ["AI reviewed"]
payload = {
"attributeDefinitionId": attribute_definition_id,
"attributeValues": attribute_values,
"productIds": product_ids,
}
client_job_id = str(uuid.uuid4())
fingerprint = request_fingerprint(payload)
print(json.dumps({
"clientJobId": client_job_id,
"fingerprint": fingerprint,
"operation": "addAttributesByIds",
}))
bulk_add_attribute(
client,
attribute_definition_id=attribute_definition_id,
attribute_values=attribute_values,
product_ids=product_ids,
)
print("Bulk attribute update applied.")
# If a later step starts an asynchronous job that returns a task ID,
# poll it to completion instead of re-issuing the operation.
task_id = "TASK_ID_FROM_AN_ASYNC_OPERATION"
if task_id != "TASK_ID_FROM_AN_ASYNC_OPERATION":
result = poll_job_status(client, task_id)
print(f"Async job completed, {result.get('count')} items processed.")Because server-side idempotency keys are not exposed, persist the clientJobId and request fingerprint before sending the write. If the process crashes, the agent can check its own job log before retrying.
Validate product data before writing
Bluestone PIM's completeness score exposes validation issues per product and context. Use it to validate before or after a write so an agent can catch data problems early instead of shipping invalid content.
def validate_product(client: BluestoneClient, product_id: str, context: str = "en") -> list:
"""Return validation issues for a product in a given context."""
return client.request(
"GET",
f"/completeness-score/validations/{product_id}/{context}",
)
if __name__ == "__main__":
client = BluestoneClient()
issues = validate_product(client, "YOUR_PRODUCT_ID", context="en")
if issues:
print("Validation issues found; do not publish:", issues)
else:
print("Product passes validation.")See Get validation issues for product for the response shape. Validation does not replace schema discovery; use validation checks together with attribute definitions, allowed values, required contexts, and the OpenAPI request schema.
Subscribe to product changes
Create a webhook and subscribe it to the product events the agent cares about, so changes are pushed instead of polled. Set up the subscription once with cURL:
BLUESTONE_AUTH_BASE="https://idp.test.bluestonepim.com/op/token"
BLUESTONE_API_BASE="https://api.test.bluestonepim.com"
CLIENT_ID="YOUR_CLIENT_ID"
CLIENT_SECRET="YOUR_CLIENT_SECRET"
WEBHOOK_URL="https://example.ngrok-free.app/webhooks/bluestone"
WEBHOOK_SECRET="LongAndSecretPassword"
ACCESS_TOKEN=$(
curl -sS -X POST "$BLUESTONE_AUTH_BASE" \
-H "Content-Type: application/x-www-form-urlencoded" \
--data-urlencode "grant_type=client_credentials" \
--data-urlencode "client_id=$CLIENT_ID" \
--data-urlencode "client_secret=$CLIENT_SECRET" | jq -r ".access_token"
)
# 1. Create the webhook. The response returns an id.
WEBHOOK_ID=$(
curl -sS -X POST "$BLUESTONE_API_BASE/notification-external/webhooks" \
-H "Authorization: Bearer $ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"active\": true, \"url\": \"$WEBHOOK_URL\", \"secret\": \"$WEBHOOK_SECRET\"}" | jq -r ".id"
)
# 2. Subscribe the webhook to product change events.
curl -sS -X PUT "$BLUESTONE_API_BASE/notification-external/subscriptions/webhook/$WEBHOOK_ID/events" \
-H "Authorization: Bearer $ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"eventTypes": [
"PRODUCT_CREATED",
"PRODUCT_WATCH_ATTRIBUTE_UPDATE_VALUE",
"PRODUCT_SYNC_DONE"
]
}'Then receive and verify the events. The Webhook events and delivery guide has complete signature-verifying receivers for Node/TypeScript with Express and Python with FastAPI. The key step is to verify the x-bs-signature HMAC-SHA256 header against the raw body, deduplicate processed events, and trigger a Management API read for each changed entityId.
Relevant references: Create webhook and Subscribe for given events for given webhook.
For AI agents
The Bluestone PIM API is built for automation. An agent can discover and drive it end to end:
- Discover the API with the docs MCP server (
list-specs,search-endpoints,get-endpoint) or the published OpenAPI specs. - Authenticate with OAuth2 client credentials and cache the token.
- Learn the schema - attribute definitions, contexts, catalogs, product types, and request schemas - before reading or writing.
- Read large datasets with cursor pagination and write at scale with bulk operations.
- Poll async task / job status for long-running work, and retry transient failures with backoff.
- Make writes safe to retry with client-side idempotency.
- React to changes with webhooks instead of polling.
- Do all of the above in the test environment before production.
Machine-readable OpenAPI specs are published per component, and an LLM index is available at /llms.txt.
Reliability checklist
- Token is cached and refreshed before expiry; it is not re-requested per call.
- Schema is discovered, not hard-coded: attribute definitions, contexts, catalogs, product metadata, and OpenAPI request schemas.
- Reads of large datasets use cursor pagination with
countup to 100. - Writes to many products use bulk operations, batched at 100 IDs.
- Asynchronous operations are tracked with async task / job status polling and backoff.
429, transient5xx, and network errors are retried with exponential backoff and jitter; other4xxerrors are not.- Writes are safe to retry through client-side idempotency, since server-side idempotency keys are not available.
- Change-driven workflows use webhooks with
x-bs-signatureverification instead of polling. - Request payload hashes, product IDs, endpoint names, and agent run IDs are logged for audit and deduplication.
- The whole workflow is validated in the test environment before it runs against production.
Updated 5 days ago
