Celerity
Applications

Consumer Event Contract

Standardised event types and body shapes for Celerity consumer handlers

When a celerity/consumer is linked to an event source such as a celerity/bucket or celerity/datastore, the Celerity runtime transforms the cloud-specific event payloads into a standardised format before they reach your handler code. This page documents the event contract that consumer handlers receive.

Scope

This contract applies to event-sourced consumers (bucket and datastore). Queue and topic consumers deliver producer messages directly with no event type or body transformation.

Consumer Message

Every message delivered to a consumer handler follows the ConsumerMessage type:

FieldTypeDescription
messageIdstringUnique identifier for the message
bodystringCelerity-standard JSON for event sources; raw producer payload for queues/topics
sourcestringFully qualified source identifier (e.g. celerity:bucket:uploads)
sourceTypestring?Parsed from source: "bucket", "datastore", "queue", or "topic"
sourceNamestring?Parsed from source: the resource name (e.g. "uploads", "orders")
eventTypestring?Celerity-standard event type — present for bucket and datastore consumers only
messageAttributesobjectVendor-specific message attributes
vendoranyFull cloud-specific event payload preserved for advanced use cases

sourceType and sourceName are parsed from the source string format celerity:{type}:{name}. Both are optional because external queue/topic sources may not follow the celerity: prefix convention.

Field naming

The contract uses camelCase field names (e.g. messageId, eventType, sourceType). Each SDK adapts these to the target language's conventions — for example, Python uses snake_case (message_id, event_type, source_type).

Event Types

Bucket Events

Bucket events are produced when objects are created, deleted, or have their metadata updated in a celerity/bucket.

Celerity Event TypeAWS S3GCP Cloud StorageAzure Blob Storage
createds3:ObjectCreated:*, s3:ObjectRestore:*OBJECT_FINALIZEMicrosoft.Storage.BlobCreated
deleteds3:ObjectRemoved:*OBJECT_DELETEMicrosoft.Storage.BlobDeleted
metadataUpdateds3:ObjectTagging:*, s3:ObjectAcl:*OBJECT_METADATA_UPDATEMicrosoft.Storage.BlobTierChanged

Note

S3 ObjectRestore:* events (Glacier restores) are mapped to created because the object becomes accessible again. GCP and Azure have no direct equivalent for object restoration.

Datastore Events

Datastore events are produced when items are inserted, modified, or removed in a celerity/datastore.

Celerity Event TypeAWS DynamoDBGCP FirestoreAzure Cosmos DB
insertedINSERT*.document.v1.createdcreate (Change Feed)
modifiedMODIFY*.document.v1.updatedreplace (Change Feed)
removedREMOVE*.document.v1.deleteddelete (Change Feed)

Source Types

The sourceType field contains one of the following values when the source follows the celerity:{type}:{name} format:

Source TypeValue
Bucket"bucket"
Datastore"datastore"
Queue"queue"
Topic"topic"

Event Type Constants

Each SDK provides typed constants so you can compare event types safely without string literals. The underlying string values are the same across all languages:

Bucket event type values:

ConstantValue
Created"created"
Deleted"deleted"
MetadataUpdated"metadataUpdated"

Datastore event type values:

ConstantValue
Inserted"inserted"
Modified"modified"
Removed"removed"

See your SDK's API reference for the language-specific constant names (e.g. BucketEventType.Created in TypeScript, BucketEventType.CREATED in Python).

Standardised Body Shapes

When the event source is a bucket or datastore, the Celerity runtime transforms the cloud-specific event body into a standardised JSON shape. The original cloud-specific body is preserved in the vendor field.

Bucket Event Body

{
  "key": "path/to/file.txt",
  "size": 1024,
  "eTag": "abc123def456"
}
FieldTypeDescription
keystringObject key/path (always present)
sizenumber?Object size in bytes — present on created, absent on deleted
eTagstring?Entity tag — present on created, absent on deleted

The bucket name is available via sourceName (e.g. for source celerity:bucket:my-bucket, sourceName is "my-bucket").

Each SDK provides a typed representation of this body shape (e.g. BucketEvent in TypeScript).

Datastore Event Body

{
  "keys": { "userId": "123", "sortKey": "profile" },
  "newItem": { "userId": "123", "sortKey": "profile", "name": "John", "age": 30 },
  "oldItem": { "userId": "123", "sortKey": "profile", "name": "John", "age": 29 }
}
FieldTypeDescription
keysobjectPrimary key attributes as plain JSON (always present)
newItemobject?Full item after change — present on inserted/modified, absent on removed
oldItemobject?Full item before change — present on modified/removed (depends on stream view type), absent on inserted

Automatic Attribute Unmarshaling

Cloud providers like DynamoDB store attributes in a typed format (e.g. {"S": "value"}, {"N": "123"}). The Celerity runtime automatically unmarshals these into plain values before they reach your handler — no manual conversion needed.

The table name is available via sourceName (e.g. for source celerity:datastore:orders, sourceName is "orders").

Each SDK provides a typed representation of this body shape (e.g. DatastoreEvent in TypeScript).

Handler Examples

Bucket Event Handler

import {
  Consumer,
  MessageHandler,
  Messages,
  BucketEventType,
} from "@celerity-sdk/core";
import type { ConsumerMessage, EventResult, BucketEvent } from "@celerity-sdk/core";

@Consumer("uploads-bucket")
class BucketEventHandler {
  @MessageHandler()
  async handle(@Messages() messages: ConsumerMessage[]): Promise<EventResult> {
    for (const msg of messages) {
      const event: BucketEvent = JSON.parse(msg.body);

      if (msg.eventType === BucketEventType.Created) {
        console.log(
          `New object in ${msg.sourceName}: ${event.key} (${event.size} bytes)`
        );
      } else if (msg.eventType === BucketEventType.Deleted) {
        console.log(`Object deleted from ${msg.sourceName}: ${event.key}`);
      }
    }
    return { success: true };
  }
}
import json
from celerity_sdk import consumer, message_handler, BucketEventType, EventResult

@consumer("uploads-bucket")
class BucketEventHandler:
    @message_handler()
    async def handle(self, messages):
        for msg in messages:
            event = json.loads(msg.body)

            if msg.event_type == BucketEventType.CREATED:
                print(
                    f"New object in {msg.source_name}: {event['key']} ({event['size']} bytes)"
                )
            elif msg.event_type == BucketEventType.DELETED:
                print(f"Object deleted from {msg.source_name}: {event['key']}")

        return EventResult(success=True)

Datastore Event Handler

import {
  Consumer,
  MessageHandler,
  Messages,
  DatastoreEventType,
} from "@celerity-sdk/core";
import type {
  ConsumerMessage,
  EventResult,
  DatastoreEvent,
} from "@celerity-sdk/core";

@Consumer("orders-table")
class DatastoreEventHandler {
  @MessageHandler()
  async handle(@Messages() messages: ConsumerMessage[]): Promise<EventResult> {
    for (const msg of messages) {
      const event: DatastoreEvent = JSON.parse(msg.body);

      switch (msg.eventType) {
        case DatastoreEventType.Inserted:
          console.log(`New item in ${msg.sourceName}:`, event.keys);
          console.log("Item:", event.newItem);
          break;
        case DatastoreEventType.Modified:
          console.log(`Item modified in ${msg.sourceName}:`, event.keys);
          console.log("Before:", event.oldItem);
          console.log("After:", event.newItem);
          break;
        case DatastoreEventType.Removed:
          console.log(`Item removed from ${msg.sourceName}:`, event.keys);
          break;
      }
    }
    return { success: true };
  }
}
import json
from celerity_sdk import consumer, message_handler, DatastoreEventType, EventResult

@consumer("orders-table")
class DatastoreEventHandler:
    @message_handler()
    async def handle(self, messages):
        for msg in messages:
            event = json.loads(msg.body)

            if msg.event_type == DatastoreEventType.INSERTED:
                print(f"New item in {msg.source_name}:", event["keys"])
                print("Item:", event["newItem"])
            elif msg.event_type == DatastoreEventType.MODIFIED:
                print(f"Item modified in {msg.source_name}:", event["keys"])
                print("Before:", event["oldItem"])
                print("After:", event["newItem"])
            elif msg.event_type == DatastoreEventType.REMOVED:
                print(f"Item removed from {msg.source_name}:", event["keys"])

        return EventResult(success=True)

Accessing Original Cloud Payloads

If you need the full cloud-specific event payload (e.g. for accessing fields not in the standardised body), the original payload is preserved in vendor.originalBody:

// vendor.originalBody contains the full cloud-specific JSON:
// - For bucket events: the S3 notification record, MinIO event, etc.
// - For datastore events: the DynamoDB stream record, Firestore event, etc.

Parse vendor.originalBody as JSON in your handler to access the raw cloud payload.

Note

vendor.originalBody is only present when the runtime has transformed the body. For queue and topic consumers, the body is the raw producer payload and no originalBody is provided.

Target Environment Support

FeatureLocalAWSGCPAzure
Bucket event types🔄 Planned🔄 Planned
Bucket body transformation🔄 Planned🔄 Planned
Datastore event types🔄 Planned🔄 Planned
Datastore body transformation🔄 Planned🔄 Planned
DynamoDB attribute unmarshalingN/AN/A
sourceType / sourceName🔄 Planned🔄 Planned

Local environment: Uses MinIO for bucket events and DynamoDB Local for datastore events. Events are delivered via Redis streams through the Celerity local-events bridge.

AWS environment: Bucket events arrive via S3 notifications to SQS. Datastore events arrive via DynamoDB Streams. In serverless mode (Lambda), the SDK adapter handles transformation. In container mode, the Celerity runtime handles transformation.

Last updated on