getWritable()
Retrieves the current workflow run's default writable stream. The writable stream can be used in both workflow and step functions to write data that can be read outside the workflow by using the readable property of the Run object.
Use this function in your workflows and steps to produce streaming output that can be consumed by clients in real-time.
This function can only be called inside a workflow or step function (functions with "use workflow" or "use step" directive)
import { getWritable } from 'workflow';
export async function myWorkflow() {
"use workflow";
const writable = getWritable();
const writer = writable.getWriter();
await writer.write(new TextEncoder().encode('Hello from workflow!'));
await writer.close();
}API Signature
Parameters
| Name | Type | Description |
|---|---|---|
options | WorkflowWritableStreamOptions | Optional configuration for the writable stream |
Returns
WritableStream<W>Returns a WritableStream<W> where W is the type of data you plan to write to the stream.
Good to Know
- The stream can be obtained from either workflow or step functions using the same
getWritable()call. - When called from a workflow, the stream can be passed as an argument to steps.
- When called from a step, it retrieves the same workflow-scoped stream directly.
- Always release the writer lock after writing to prevent resource leaks.
- The stream can write binary data (using
TextEncoder) or structured objects. - Remember to close the stream when finished to signal completion.
Examples
Basic Text Streaming
Here's a simple example streaming text data:
import { sleep, getWritable } from 'workflow';
export async function outputStreamWorkflow() {
"use workflow";
const writable = getWritable();
await sleep("1s");
await stepWithOutputStream(writable);
await sleep("1s");
await stepCloseOutputStream(writable);
return 'done';
}
async function stepWithOutputStream(writable: WritableStream) {
"use step";
const writer = writable.getWriter();
// Write binary data using TextEncoder
await writer.write(new TextEncoder().encode('Hello, world!'));
writer.releaseLock();
}
async function stepCloseOutputStream(writable: WritableStream) {
"use step";
// Close the stream to signal completion
await writable.close();
}Calling getWritable() Inside Steps
You can also call getWritable() directly inside step functions without passing it as a parameter:
import { sleep, getWritable } from 'workflow';
export async function outputStreamFromStepWorkflow() {
"use workflow";
// No need to create or pass the stream - steps can get it themselves
await sleep("1s");
await stepWithOutputStreamInside();
await sleep("1s");
await stepCloseOutputStreamInside();
return 'done';
}
async function stepWithOutputStreamInside() {
"use step";
// Call getWritable() directly inside the step
const writable = getWritable();
const writer = writable.getWriter();
await writer.write(new TextEncoder().encode('Hello from step!'));
writer.releaseLock();
}
async function stepCloseOutputStreamInside() {
"use step";
// Call getWritable() to get the same stream
const writable = getWritable();
await writable.close();
}Using Namespaced Streams in Steps
You can also use namespaced streams when calling getWritable() from steps:
import { getWritable } from 'workflow';
export async function multiStreamWorkflow() {
"use workflow";
// Steps will access both streams by namespace
await writeToDefaultStream();
await writeToNamedStream();
await closeStreams();
return 'done';
}
async function writeToDefaultStream() {
"use step";
const writable = getWritable(); // Default stream
const writer = writable.getWriter();
await writer.write({ message: 'Default stream data' });
writer.releaseLock();
}
async function writeToNamedStream() {
"use step";
const writable = getWritable({ namespace: 'logs' });
const writer = writable.getWriter();
await writer.write({ log: 'Named stream data' });
writer.releaseLock();
}
async function closeStreams() {
"use step";
await getWritable().close(); // Close default stream
await getWritable({ namespace: 'logs' }).close(); // Close named stream
}Advanced Chat Streaming
Here's a more complex example showing how you might stream AI chat responses:
import { getWritable } from 'workflow';
import { generateId, streamText, type UIMessageChunk } from 'ai';
export async function chat(messages: UIMessage[]) {
'use workflow';
// Get typed writable stream for UI message chunks
const writable = getWritable<UIMessageChunk>();
// Start the stream
await startStream(writable);
let currentMessages = [...messages];
// Process messages in steps
for (let i = 0; i < MAX_STEPS; i++) {
const result = await streamTextStep(currentMessages, writable);
currentMessages.push(result.messages)
if (result.finishReason !== 'tool-calls') {
break;
}
}
// End the stream
await endStream(writable);
}
async function startStream(writable: WritableStream<UIMessageChunk>) {
'use step';
const writer = writable.getWriter();
// Send start message
writer.write({
type: 'start',
messageMetadata: {
createdAt: Date.now(),
messageId: generateId(),
},
});
writer.releaseLock();
}
async function streamTextStep(writable: WritableStream<UIMessageChunk>) {
'use step';
const writer = writable.getWriter();
// Call streamText from the AI SDK
const result = streamText({
model: 'gpt-4',
messages,
/* other options */
});
// Pipe the AI stream into the writable stream
const reader = result
.toUIMessageStream({ sendStart: false, sendFinish: false })
.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
await writer.write(value);
}
reader.releaseLock();
// Close the stream
writer.close();
writer.releaseLock();
}
async function endStream(writable: WritableStream<UIMessageChunk>) {
'use step';
const writer = writable.getWriter();
// Close the stream
writer.close();
writer.releaseLock();
}