Consumer Event Contract
Standardised event types and body shapes for Celerity consumer handlers
When a celerity/consumer is linked to an event source such as a celerity/bucket or celerity/datastore, the Celerity runtime transforms the cloud-specific event payloads into a standardised format before they reach your handler code. This page documents the event contract that consumer handlers receive.
Scope
This contract applies to event-sourced consumers (bucket and datastore). Queue and topic consumers deliver producer messages directly with no event type or body transformation.
Consumer Message
Every message delivered to a consumer handler follows the ConsumerMessage type:
| Field | Type | Description |
|---|---|---|
messageId | string | Unique identifier for the message |
body | string | Celerity-standard JSON for event sources; raw producer payload for queues/topics |
source | string | Fully qualified source identifier (e.g. celerity:bucket:uploads) |
sourceType | string? | Parsed from source: "bucket", "datastore", "queue", or "topic" |
sourceName | string? | Parsed from source: the resource name (e.g. "uploads", "orders") |
eventType | string? | Celerity-standard event type — present for bucket and datastore consumers only |
messageAttributes | object | Vendor-specific message attributes |
vendor | any | Full cloud-specific event payload preserved for advanced use cases |
sourceType and sourceName are parsed from the source string format celerity:{type}:{name}. Both are optional because external queue/topic sources may not follow the celerity: prefix convention.
Field naming
The contract uses camelCase field names (e.g. messageId, eventType, sourceType). Each SDK adapts these to the target language's conventions — for example, Python uses snake_case (message_id, event_type, source_type).
Event Types
Bucket Events
Bucket events are produced when objects are created, deleted, or have their metadata updated in a celerity/bucket.
| Celerity Event Type | AWS S3 | GCP Cloud Storage | Azure Blob Storage |
|---|---|---|---|
created | s3:ObjectCreated:*, s3:ObjectRestore:* | OBJECT_FINALIZE | Microsoft.Storage.BlobCreated |
deleted | s3:ObjectRemoved:* | OBJECT_DELETE | Microsoft.Storage.BlobDeleted |
metadataUpdated | s3:ObjectTagging:*, s3:ObjectAcl:* | OBJECT_METADATA_UPDATE | Microsoft.Storage.BlobTierChanged |
Note
S3 ObjectRestore:* events (Glacier restores) are mapped to created because the object becomes accessible again. GCP and Azure have no direct equivalent for object restoration.
Datastore Events
Datastore events are produced when items are inserted, modified, or removed in a celerity/datastore.
| Celerity Event Type | AWS DynamoDB | GCP Firestore | Azure Cosmos DB |
|---|---|---|---|
inserted | INSERT | *.document.v1.created | create (Change Feed) |
modified | MODIFY | *.document.v1.updated | replace (Change Feed) |
removed | REMOVE | *.document.v1.deleted | delete (Change Feed) |
Source Types
The sourceType field contains one of the following values when the source follows the celerity:{type}:{name} format:
| Source Type | Value |
|---|---|
| Bucket | "bucket" |
| Datastore | "datastore" |
| Queue | "queue" |
| Topic | "topic" |
Event Type Constants
Each SDK provides typed constants so you can compare event types safely without string literals. The underlying string values are the same across all languages:
Bucket event type values:
| Constant | Value |
|---|---|
| Created | "created" |
| Deleted | "deleted" |
| MetadataUpdated | "metadataUpdated" |
Datastore event type values:
| Constant | Value |
|---|---|
| Inserted | "inserted" |
| Modified | "modified" |
| Removed | "removed" |
See your SDK's API reference for the language-specific constant names (e.g. BucketEventType.Created in TypeScript, BucketEventType.CREATED in Python).
Standardised Body Shapes
When the event source is a bucket or datastore, the Celerity runtime transforms the cloud-specific event body into a standardised JSON shape. The original cloud-specific body is preserved in the vendor field.
Bucket Event Body
{
"key": "path/to/file.txt",
"size": 1024,
"eTag": "abc123def456"
}| Field | Type | Description |
|---|---|---|
key | string | Object key/path (always present) |
size | number? | Object size in bytes — present on created, absent on deleted |
eTag | string? | Entity tag — present on created, absent on deleted |
The bucket name is available via sourceName (e.g. for source celerity:bucket:my-bucket, sourceName is "my-bucket").
Each SDK provides a typed representation of this body shape (e.g. BucketEvent in TypeScript).
Datastore Event Body
{
"keys": { "userId": "123", "sortKey": "profile" },
"newItem": { "userId": "123", "sortKey": "profile", "name": "John", "age": 30 },
"oldItem": { "userId": "123", "sortKey": "profile", "name": "John", "age": 29 }
}| Field | Type | Description |
|---|---|---|
keys | object | Primary key attributes as plain JSON (always present) |
newItem | object? | Full item after change — present on inserted/modified, absent on removed |
oldItem | object? | Full item before change — present on modified/removed (depends on stream view type), absent on inserted |
Automatic Attribute Unmarshaling
Cloud providers like DynamoDB store attributes in a typed format (e.g. {"S": "value"}, {"N": "123"}). The Celerity runtime automatically unmarshals these into plain values before they reach your handler — no manual conversion needed.
The table name is available via sourceName (e.g. for source celerity:datastore:orders, sourceName is "orders").
Each SDK provides a typed representation of this body shape (e.g. DatastoreEvent in TypeScript).
Handler Examples
Bucket Event Handler
import {
Consumer,
MessageHandler,
Messages,
BucketEventType,
} from "@celerity-sdk/core";
import type { ConsumerMessage, EventResult, BucketEvent } from "@celerity-sdk/core";
@Consumer("uploads-bucket")
class BucketEventHandler {
@MessageHandler()
async handle(@Messages() messages: ConsumerMessage[]): Promise<EventResult> {
for (const msg of messages) {
const event: BucketEvent = JSON.parse(msg.body);
if (msg.eventType === BucketEventType.Created) {
console.log(
`New object in ${msg.sourceName}: ${event.key} (${event.size} bytes)`
);
} else if (msg.eventType === BucketEventType.Deleted) {
console.log(`Object deleted from ${msg.sourceName}: ${event.key}`);
}
}
return { success: true };
}
}import json
from celerity_sdk import consumer, message_handler, BucketEventType, EventResult
@consumer("uploads-bucket")
class BucketEventHandler:
@message_handler()
async def handle(self, messages):
for msg in messages:
event = json.loads(msg.body)
if msg.event_type == BucketEventType.CREATED:
print(
f"New object in {msg.source_name}: {event['key']} ({event['size']} bytes)"
)
elif msg.event_type == BucketEventType.DELETED:
print(f"Object deleted from {msg.source_name}: {event['key']}")
return EventResult(success=True)Datastore Event Handler
import {
Consumer,
MessageHandler,
Messages,
DatastoreEventType,
} from "@celerity-sdk/core";
import type {
ConsumerMessage,
EventResult,
DatastoreEvent,
} from "@celerity-sdk/core";
@Consumer("orders-table")
class DatastoreEventHandler {
@MessageHandler()
async handle(@Messages() messages: ConsumerMessage[]): Promise<EventResult> {
for (const msg of messages) {
const event: DatastoreEvent = JSON.parse(msg.body);
switch (msg.eventType) {
case DatastoreEventType.Inserted:
console.log(`New item in ${msg.sourceName}:`, event.keys);
console.log("Item:", event.newItem);
break;
case DatastoreEventType.Modified:
console.log(`Item modified in ${msg.sourceName}:`, event.keys);
console.log("Before:", event.oldItem);
console.log("After:", event.newItem);
break;
case DatastoreEventType.Removed:
console.log(`Item removed from ${msg.sourceName}:`, event.keys);
break;
}
}
return { success: true };
}
}import json
from celerity_sdk import consumer, message_handler, DatastoreEventType, EventResult
@consumer("orders-table")
class DatastoreEventHandler:
@message_handler()
async def handle(self, messages):
for msg in messages:
event = json.loads(msg.body)
if msg.event_type == DatastoreEventType.INSERTED:
print(f"New item in {msg.source_name}:", event["keys"])
print("Item:", event["newItem"])
elif msg.event_type == DatastoreEventType.MODIFIED:
print(f"Item modified in {msg.source_name}:", event["keys"])
print("Before:", event["oldItem"])
print("After:", event["newItem"])
elif msg.event_type == DatastoreEventType.REMOVED:
print(f"Item removed from {msg.source_name}:", event["keys"])
return EventResult(success=True)Accessing Original Cloud Payloads
If you need the full cloud-specific event payload (e.g. for accessing fields not in the standardised body), the original payload is preserved in vendor.originalBody:
// vendor.originalBody contains the full cloud-specific JSON:
// - For bucket events: the S3 notification record, MinIO event, etc.
// - For datastore events: the DynamoDB stream record, Firestore event, etc.Parse vendor.originalBody as JSON in your handler to access the raw cloud payload.
Note
vendor.originalBody is only present when the runtime has transformed the body. For queue and topic consumers, the body is the raw producer payload and no originalBody is provided.
Target Environment Support
| Feature | Local | AWS | GCP | Azure |
|---|---|---|---|---|
| Bucket event types | ✅ | ✅ | 🔄 Planned | 🔄 Planned |
| Bucket body transformation | ✅ | ✅ | 🔄 Planned | 🔄 Planned |
| Datastore event types | ✅ | ✅ | 🔄 Planned | 🔄 Planned |
| Datastore body transformation | ✅ | ✅ | 🔄 Planned | 🔄 Planned |
| DynamoDB attribute unmarshaling | ✅ | ✅ | N/A | N/A |
sourceType / sourceName | ✅ | ✅ | 🔄 Planned | 🔄 Planned |
Local environment: Uses MinIO for bucket events and DynamoDB Local for datastore events. Events are delivered via Redis streams through the Celerity local-events bridge.
AWS environment: Bucket events arrive via S3 notifications to SQS. Datastore events arrive via DynamoDB Streams. In serverless mode (Lambda), the SDK adapter handles transformation. In container mode, the Celerity runtime handles transformation.
Last updated on