Skip to main content

Streams (Beta)

danger

Pay careful attention to caveats at the bottom

Streams are a beta feature that let you pass streamable data from your server to your client with extreme ease.

Let's take an example where we have a text generation endpoint that looks like this:

server.endpoints.register<Blueprint.Plug>(Blueprint, async (params, auth) => {
const text = await callMyLLMAndGetABigTextBlock(params.prompt);

return { text };
});

We can update the blueprint to use the MoopsyStream type from @moopsyjs/core:

import type { MoopsyStream } from "@moopsyjs/core";

export type ParamsType = {
prompt: string;
} | {
generate: string;
};
export type ResponseType = {
text: MoopsyStream<string>;
};
export const Endpoint = "docs/generate-text";
export const Method = "POST";

export interface Plug {
params: ParamsType;
response: ResponseType;
method: typeof Method;
endpoint: typeof Endpoint;
}

Now in our endpoint, we can construct a WriteableMoopsyStream and pass it down:

import { WriteableMoopsyStream } from "@moopsyjs/server";

server.endpoints.register<Blueprint.Plug>(Blueprint, async (params, auth) => {
const text = new WriteableMoopsyStream<string>();

return { text };
});

And just pipe the data in!

import { WriteableMoopsyStream } from "@moopsyjs/server";

server.endpoints.register<Blueprint.Plug>(Blueprint, async (params, auth) => {
const text = new WriteableMoopsyStream<string>();

callMyLLM(params.prompt).on("text", (newText) => {
text.write(newText); // Pass the text
}).on("end", () => {
text.end(); // When complete, end the stream
});

return { text };
});

On the frontend, the MoopsyStream return value automatically gets converted from the response of a mutation to a ReadableMoopsyStream we can use as so:

const res = await generateMutation.call({
generate: value
});

res.text.onData(data => { // note that whatever type your stream consists of, the onData callback can receive multiple
for(const newText of data) {
doSomething(newText);
}
}).onEnd(() => {
// do something
});

Best Practices

Streams are intended for relatively short lived data transfers. Some examples:

  • Stream live generated data from a LLM
  • Stream the progress of a short-lived async operation
  • Stream expensive query data that isn't part of the critical path

Caveats

MoopsyStreams must be a top level value of a response object.

There is currently no automatic safety for this and must be manually enforced.

✅ Value of Top Level Response Object

export type ResponseType = {
text: MoopsyStream<string>;
};

❌ Stream as Response

export type ResponseType = MoopsyStream<string>;

❌ Stream as nested value

export type ResponseType = {
foo: {
bar: MoopsyStream<string>;
}
};

MoopsyStreams can only be consumed in useMutation()

Queries do not support MoopsyStreams. To consume a MoopsyStream, you must get it from the promise returned by .call()ing a useMutation() mutation.

Clients cannot start receiving data until the main endpoint handler returns

This means that for streams to be effective, you need to create the stream in the endpoint handler and return immediately, and async push data afterwards