Preprocessors
Scripts and flows can include a preprocessor to transform incoming requests before they are passed to the runnable. The preprocessor is only called when the runnable is triggered via a webhook, an HTTP route, an email trigger, a WebSocket trigger, a Kafka trigger, a NATS trigger, a Postgres trigger, an SQS trigger or an MQTT trigger.
This approach is useful for preprocessing arguments differently depending on the trigger before the execution of the runnable. It also separates the handling of arguments according to whether they are called by a trigger or from the UI, which can help you keep a simple schema form in the UI for the runnable.
The preprocessor receives an event
parameter, which contains all the main trigger data plus additional metadata.
The object always contain a kind
field that contains the type of trigger. Other arguments are specific to the trigger type.
You can find more details about the arguments format and the structure of event
for each trigger kind in their respective documentation pages, or below in the templates.
Preprocessors can only be written in TypeScript (Bun/Deno) or Python.
Script preprocessor
In scripts, you need to export an additional preprocessor
function.
The returned object defines the parameter values passed to main()
.
For instance, returning { b: 1, a: 2 }
in the preprocessor will call main(2, 1)
, assuming main
is defined as main(a: number, b: number)
.
Ensure that the parameter names in main
match the keys in the returned object.
Here are some templates for scripts with preprocessors in TypeScript and Python:
- TypeScript
- Python
export async function preprocessor(
event:
| {
kind: "webhook";
body: any,
raw_string: string | null,
query: Record<string, string>;
headers: Record<string, string>;
}
| {
kind: "http";
body: any,
raw_string: string | null,
route: string;
path: string;
method: string;
params: Record<string, string>;
query: Record<string, string>;
headers: Record<string, string>;
}
| {
kind: "email";
parsed_email: any,
raw_email: string,
}
| { kind: "websocket"; msg: string; url: string }
| {
kind: "kafka";
payload: string;
brokers: string[];
topic: string;
group_id: string;
}
| {
kind: "nats";
payload: string;
servers: string[];
subject: string;
headers?: Record<string, string[]>;
status?: number;
description?: string;
length: number;
}
| {
kind: "sqs";
msg: string,
queue_url: string;
message_id?: string;
receipt_handle?: string;
attributes: Record<string, string>;
message_attributes?: Record<
string,
{ string_value?: string; data_type: string }
>;
}
| {
kind: "mqtt";
payload: string,
topic: string;
retain: boolean;
pkid: number;
qos: number;
v5?: {
payload_format_indicator?: number;
topic_alias?: number;
response_topic?: string;
correlation_data?: Array<number>;
user_properties?: Array<[string, string]>;
subscription_identifiers?: Array<number>;
content_type?: string;
};
}
| {
kind: "gcp";
payload: string,
message_id: string;
subscription: string;
ordering_key?: string;
attributes?: Record<string, string>;
delivery_type: "push" | "pull";
headers?: Record<string, string>;
publish_time?: string;
}
) {
// you can use a switch statement to handle different trigger kinds
switch (event.kind) {
case "webhook":
return {
// return the args to be passed to the runnable
};
case "http":
return {
// return the args to be passed to the runnable
};
// ...
default:
throw new Error(`Unsupported trigger kind: ${event.kind}`);
}
}
export async function main(/* main args */) {
// your code here
}
from typing import TypedDict, Literal, Optional, Union
class WebhookEvent(TypedDict):
kind: Literal["webhook"]
body: dict
raw_string: Optional[str]
query: dict[str, str]
headers: dict[str, str]
class HttpEvent(TypedDict):
kind: Literal["http"]
body: dict
raw_string: Optional[str]
route: str
path: str
method: str
params: dict[str, str]
query: dict[str, str]
headers: dict[str, str]
class EmailEvent(TypedDict):
kind: Literal["email"]
parsed_email: dict
raw_email: str
class WebsocketEvent(TypedDict):
kind: Literal["websocket"]
msg: str
url: str
class KafkaEvent(TypedDict):
kind: Literal["kafka"]
payload: str
brokers: list[str]
topic: str
group_id: str
class NatsEvent(TypedDict):
kind: Literal["nats"]
payload: str
servers: list[str]
subject: str
headers: Optional[dict[str, list[str]]]
status: Optional[int]
description: Optional[str]
length: int
class MessageAttribute(TypedDict):
string_value: Optional[str]
data_type: str
class SqsEvent(TypedDict):
kind: Literal["sqs"]
msg: str
queue_url: str
message_id: Optional[str]
receipt_handle: Optional[str]
attributes: dict[str, str]
message_attributes: Optional[dict[str, MessageAttribute]]
class MqttV5Properties(TypedDict, total=False):
payload_format_indicator: Optional[int]
topic_alias: Optional[int]
response_topic: Optional[str]
correlation_data: Optional[list[int]]
user_properties: Optional[list[tuple[str, str]]]
subscription_identifiers: Optional[list[int]]
content_type: Optional[str]
class MqttEvent(TypedDict):
kind: Literal["mqtt"]
payload: str
topic: str
retain: bool
pkid: int
qos: int
v5: Optional[MqttV5Properties]
class GcpEvent(TypedDict):
kind: Literal["gcp"]
payload: str
message_id: str
subscription: str
ordering_key: Optional[str]
attributes: Optional[dict[str, str]]
delivery_type: Literal["push", "pull"]
headers: Optional[dict[str, str]]
publish_time: Optional[str]
Event = Union[
WebhookEvent,
HttpEvent,
EmailEvent,
WebsocketEvent,
KafkaEvent,
NatsEvent,
SqsEvent,
MqttEvent,
GcpEvent,
]
def preprocessor(event: Event):
# you can use a switch statement to handle different trigger kinds
match event["kind"]:
case "webhook":
return {
# return the args to be passed to the runnable
}
case "http":
return {
# return the args to be passed to the runnable
}
# ...
case _:
raise ValueError(f"Unsupported trigger kind: {event['kind']}")
def main(): # add the parameters you expect from the preprocessor
# your code here
Once a preprocessor is created, you should see a new tab in the right panel of the editor that allows you to test the preprocessor with a sample request.
Flow preprocessor
For flows, the idea is similar but the preprocessor is a standalone step that returns only a preprocessor
function.
To create a preprocessor for a flow, click on the plus button above the Input
step:
The returned object determines the parameter values passed to the flow.
For instance, returning { b: 1, a: 2 }
in the preprocessor will call the flow with a = 2
and b = 1
, assuming the flow has two inputs called a
and b
.
Ensure that the input names of the flow match the keys in the returned object.
Below you'll find preprocessor step templates for flows in TypeScript and Python:
- TypeScript
- Python
export async function preprocessor(
event:
| {
kind: "webhook";
body: any,
raw_string: string | null,
query: Record<string, string>;
headers: Record<string, string>;
}
| {
kind: "http";
body: any,
raw_string: string | null,
route: string;
path: string;
method: string;
params: Record<string, string>;
query: Record<string, string>;
headers: Record<string, string>;
}
| {
kind: "email";
parsed_email: any,
raw_email: string,
}
| { kind: "websocket"; msg: string; url: string }
| {
kind: "kafka";
payload: string;
brokers: string[];
topic: string;
group_id: string;
}
| {
kind: "nats";
payload: string;
servers: string[];
subject: string;
headers?: Record<string, string[]>;
status?: number;
description?: string;
length: number;
}
| {
kind: "sqs";
msg: string,
queue_url: string;
message_id?: string;
receipt_handle?: string;
attributes: Record<string, string>;
message_attributes?: Record<
string,
{ string_value?: string; data_type: string }
>;
}
| {
kind: "mqtt";
payload: string,
topic: string;
retain: boolean;
pkid: number;
qos: number;
v5?: {
payload_format_indicator?: number;
topic_alias?: number;
response_topic?: string;
correlation_data?: Array<number>;
user_properties?: Array<[string, string]>;
subscription_identifiers?: Array<number>;
content_type?: string;
};
}
| {
kind: "gcp";
payload: string,
message_id: string;
subscription: string;
ordering_key?: string;
attributes?: Record<string, string>;
delivery_type: "push" | "pull";
headers?: Record<string, string>;
publish_time?: string;
}
) {
// you can use a switch statement to handle different trigger kinds
switch (event.kind) {
case "webhook":
return {
// return the args to be passed to the runnable
};
case "http":
return {
// return the args to be passed to the runnable
};
// ...
default:
throw new Error(`Unsupported trigger kind: ${event.kind}`);
}
}
from typing import TypedDict, Literal, Optional, Union
class WebhookEvent(TypedDict):
kind: Literal["webhook"]
body: dict
raw_string: Optional[str]
query: dict[str, str]
headers: dict[str, str]
class HttpEvent(TypedDict):
kind: Literal["http"]
body: dict
raw_string: Optional[str]
route: str
path: str
method: str
params: dict[str, str]
query: dict[str, str]
headers: dict[str, str]
class EmailEvent(TypedDict):
kind: Literal["email"]
parsed_email: dict
raw_email: str
class WebsocketEvent(TypedDict):
kind: Literal["websocket"]
msg: str
url: str
class KafkaEvent(TypedDict):
kind: Literal["kafka"]
payload: str
brokers: list[str]
topic: str
group_id: str
class NatsEvent(TypedDict):
kind: Literal["nats"]
payload: str
servers: list[str]
subject: str
headers: Optional[dict[str, list[str]]]
status: Optional[int]
description: Optional[str]
length: int
class MessageAttribute(TypedDict):
string_value: Optional[str]
data_type: str
class SqsEvent(TypedDict):
kind: Literal["sqs"]
msg: str
queue_url: str
message_id: Optional[str]
receipt_handle: Optional[str]
attributes: dict[str, str]
message_attributes: Optional[dict[str, MessageAttribute]]
class MqttV5Properties(TypedDict, total=False):
payload_format_indicator: Optional[int]
topic_alias: Optional[int]
response_topic: Optional[str]
correlation_data: Optional[list[int]]
user_properties: Optional[list[tuple[str, str]]]
subscription_identifiers: Optional[list[int]]
content_type: Optional[str]
class MqttEvent(TypedDict):
kind: Literal["mqtt"]
payload: str
topic: str
retain: bool
pkid: int
qos: int
v5: Optional[MqttV5Properties]
class GcpEvent(TypedDict):
kind: Literal["gcp"]
payload: str
message_id: str
subscription: str
ordering_key: Optional[str]
attributes: Optional[dict[str, str]]
delivery_type: Literal["push", "pull"]
headers: Optional[dict[str, str]]
publish_time: Optional[str]
Event = Union[
WebhookEvent,
HttpEvent,
EmailEvent,
WebsocketEvent,
KafkaEvent,
NatsEvent,
SqsEvent,
MqttEvent,
GcpEvent,
]
def preprocessor(event: Event):
# you can use a switch statement to handle different trigger kinds
match event["kind"]:
case "webhook":
return {
# return the args to be passed to the runnable
}
case "http":
return {
# return the args to be passed to the runnable
}
# ...
case _:
raise ValueError(f"Unsupported trigger kind: {event['kind']}")