GraphQL Subscriptions
Subscriptions notify you when an event occurs on the server side.
This feature requires PostGraphile v4.4.0 or higher.
Introduction
Pass --subscriptions
(or subscriptions: true
) to PostGraphile and we'll
enhance GraphiQL with subscription capabilities and give your PostGraphile
server the power of websocket communications. This will enable the websocket
endpoint.
Although you can now use makeExtendSchemaPlugin
to add your own subscription
fields using your own realtime events, it's likely that you'll want to add the
@graphile/pg-pubsub
realtime provider plugin
so that you can leverage PostgreSQL's built-in pubsub support. For example, this
plugin will allow makeExtendSchemaPlugin
to use the @pgSubscriptions
directive to easily define your own subscriptions using PostgreSQL's
LISTEN
/NOTIFY
(recommended for production). This plugin also adds the
--simple-subscriptions
flag that can be used to add a simple listen
subscription field to your GraphQL API (useful for experimentation). See below
how to enable the plugin for each approach.
If you just use the --subscriptions
flag alone, you'll notice that your schema
still only has query
and mutation
operation types. To add subscriptions to
your GraphQL schema you'll need a plugin to provide the relevant subscription
fields (by extending the Subscription
type) - or you can write your own
with makeExtendSchemaPlugin
.
The easiest way to get started is with Simple Subscriptions (see below) but we recommend that you take the Custom Subscriptions approach as it allows you to be much more expressive about the realtime features of your GraphQL API.
NOTE: the endpoint for subscriptions is the same as for GraphQL, except the
protocol is changed from http
or https
to ws
or wss
respectively.
Custom Subscriptions
In this implementation, you use PostGraphile's extensibility to define the exact subscriptions you need.
Writing the plugin
Using makeExtendSchemaPlugin
we can define a new subscription field and the
subscription payload type that it returns. Using the
@pgSubscription(topic: ...)
directive provided by @graphile/pg-pubsub
we can
embed a function that will calculate the PostgreSQL topic to subscribe to based
on the arguments and context passed to the GraphQL field (in this case factoring
in the user ID).
// MySubscriptionPlugin.js
const { makeExtendSchemaPlugin, gql, embed } = require("graphile-utils");
// or: import { makeExtendSchemaPlugin, gql, embed } from 'graphile-utils';
const currentUserTopicFromContext = async (_args, context, _resolveInfo) => {
if (context.jwtClaims.user_id) {
return `graphql:user:${context.jwtClaims.user_id}`;
} else {
throw new Error("You're not logged in");
}
};
module.exports = makeExtendSchemaPlugin(({ pgSql: sql }) => ({
typeDefs: gql`
type UserSubscriptionPayload {
# This is populated by our resolver below
user: User
# This is returned directly from the PostgreSQL subscription payload (JSON object)
event: String
}
extend type Subscription {
"""
Triggered when the current user's data changes:
- direct modifications to the user
- when their organization membership changes
"""
currentUserUpdated: UserSubscriptionPayload @pgSubscription(topic: ${embed(
currentUserTopicFromContext,
)})
}
`,
resolvers: {
UserSubscriptionPayload: {
// This method finds the user from the database based on the event
// published by PostgreSQL.
//
// In a future release, we hope to enable you to replace this entire
// method with a small schema directive above, should you so desire. It's
// mostly boilerplate.
async user(
event,
_args,
_context,
{ graphile: { selectGraphQLResultFromTable } },
) {
const rows = await selectGraphQLResultFromTable(
sql.fragment`app_public.users`,
(tableAlias, sqlBuilder) => {
sqlBuilder.where(
sql.fragment`${tableAlias}.id = ${sql.value(event.subject)}`,
);
},
);
return rows[0];
},
},
},
}));
I'm using a fairly complex PostgreSQL function so that I can just use `CREATE TRIGGER` to trigger events in future without having to define a function for each trigger. Click this paragraph to expand and see the function.
create function app_public.graphql_subscription() returns trigger as $$
declare
v_process_new bool = (TG_OP = 'INSERT' OR TG_OP = 'UPDATE');
v_process_old bool = (TG_OP = 'UPDATE' OR TG_OP = 'DELETE');
v_event text = TG_ARGV[0];
v_topic_template text = TG_ARGV[1];
v_attribute text = TG_ARGV[2];
v_record record;
v_sub text;
v_topic text;
v_i int = 0;
v_last_topic text;
begin
-- On UPDATE sometimes topic may be changed for NEW record,
-- so we need notify to both topics NEW and OLD.
for v_i in 0..1 loop
if (v_i = 0) and v_process_new is true then
v_record = new;
elsif (v_i = 1) and v_process_old is true then
v_record = old;
else
continue;
end if;
if v_attribute is not null then
execute 'select $1.' || quote_ident(v_attribute)
using v_record
into v_sub;
end if;
if v_sub is not null then
v_topic = replace(v_topic_template, '$1', v_sub);
else
v_topic = v_topic_template;
end if;
if v_topic is distinct from v_last_topic then
-- This if statement prevents us from triggering the same notification twice
v_last_topic = v_topic;
perform pg_notify(v_topic, json_build_object(
'event', v_event,
'subject', v_sub
)::text);
end if;
end loop;
return v_record;
end;
$$ language plpgsql volatile set search_path from current;
Hooking the database up to a GraphQL subscription is now just the case of
CREATE TRIGGER
:
CREATE TRIGGER _500_gql_update
AFTER UPDATE ON app_public.users
FOR EACH ROW
EXECUTE PROCEDURE app_public.graphql_subscription(
'userChanged', -- the "event" string, useful for the client to know what happened
'graphql:user:$1', -- the "topic" the event will be published to, as a template
'id' -- If specified, `$1` above will be replaced with NEW.id or OLD.id from the trigger.
);
CREATE TRIGGER _500_gql_update_member
AFTER INSERT OR UPDATE OR DELETE ON app_public.organization_members
FOR EACH ROW
EXECUTE PROCEDURE app_public.graphql_subscription('organizationsChanged', 'graphql:user:$1', 'member_id');
Enabling with the CLI
Load the @graphile/pg-pubsub
server plugin (--plugins
, or pluginHook
for
the library), our custom subscription schema plugin (--append-plugins
, or
appendPlugins
for the library), and enable PostGraphile server's websockets
support with --subscriptions
(or subscriptions: true
for the library).
postgraphile \
--plugins @graphile/pg-pubsub \
--append-plugins `pwd`/MySubscriptionPlugin.js \
--subscriptions \
-c mydb
Enabling with an Express app
When using PostGraphile as a library, you may enable Custom Subscriptions by
passing the pluginHook
with the @graphile/pg-pubsub
plugin, setting
subscriptions: true
and adding your custom plugin.
We emulate part of the Express stack, so if you require sessions you can pass
additional Connect/Express middleware (sorry, we don't support Koa middleware
here at this time) via the websocketMiddlewares
option.
Here's an example:
const express = require("express");
const { postgraphile, makePluginHook } = require("postgraphile");
const MySubscriptionPlugin = require("./MySubscriptionPlugin"); // our plugin defined in previous step
const { default: PgPubsub } = require("@graphile/pg-pubsub"); // remember to install through yarn/npm
const pluginHook = makePluginHook([PgPubsub]);
const postgraphileOptions = {
pluginHook, // add the plugin hook. This will make the @pgSubscription avaiable in our schema definitions
subscriptions: true, // start the websocket server
appendPlugins: [MySubscriptionPlugin], // Load our plugin
websocketMiddlewares: [
// Add whatever middleware you need here, note that they should only
// manipulate properties on req/res, they must not sent response data. e.g.:
//
// require('express-session')(),
// require('passport').initialize(),
// require('passport').session(),
],
};
const app = express();
app.use(postgraphile(databaseUrl, "app_public", postgraphileOptions));
app.listen(parseInt(process.env.PORT, 10) || 3000);
Testing your subscription with GraphiQL/GraphQL Playground
To test your subscription you will need to first subscribe and then trigger it.
To subscribe, in one GraphiQL tab execute
subscription MySubscription {
currentUserUpdated {
user
event
}
}
You should get the answer: "Waiting for subscription to yield data…"
To trigger the subscription, in another GraphiQL tab run a mutation that changes the user. This will depend on your implementation, for example:
mutation MyMutation {
updateUserById(input: { userPatch: { name: "foo" }, id: 27 }) {
clientMutationId
}
}
In this tab you will get the regular mutation answer. Going back to the previous tab, you will see the subscription payload. You are good to go! This should serve as the basis to implement your own custom subscriptions.
Simple Subscriptions
In this implementation, we have @graphile/pg-pubsub
automatically expose a
generic listen
handler that can be used with arbitrary topics in PostgreSQL —
it requires very little ahead-of-time planning.
Enabling with the CLI
To enable Simple Subscriptions via the CLI, just load the @graphile/pg-pubsub
plugin and pass the --subscriptions
and --simple-subscriptions
flags.
postgraphile \
--plugins @graphile/pg-pubsub \
--subscriptions \
--simple-subscriptions \
-c mydb
Enabling with an Express app
When using PostGraphile as a library, you may enable Simple Subscriptions by
passing the pluginHook
with the @graphile/pg-pubsub
plugin and using
simpleSubscriptions: true
.
We emulate part of the Express stack, so if you require sessions you can pass
additional Connect/Express middleware (sorry, we don't support Koa middleware
here at this time) via the websocketMiddlewares
option.
Here's an example:
const express = require("express");
const { postgraphile, makePluginHook } = require("postgraphile");
const { default: PgPubsub } = require("@graphile/pg-pubsub");
const pluginHook = makePluginHook([PgPubsub]);
const postgraphileOptions = {
pluginHook,
subscriptions: true,
simpleSubscriptions: true,
websocketMiddlewares: [
// Add whatever middleware you need here, note that they should only
// manipulate properties on req/res, they must not sent response data. e.g.:
//
// require('express-session')(),
// require('passport').initialize(),
// require('passport').session(),
],
};
const app = express();
app.use(postgraphile(databaseUrl, "app_public", postgraphileOptions));
app.listen(parseInt(process.env.PORT, 10) || 3000);
Using
Simple subscriptions exposes a listen
field to the Subscription
type that
can be used for generic subscriptions to a named topic. This topic can be
triggered using PostgreSQL's built in LISTEN/NOTIFY functionality (but remember
to add the prefix - see below).
type ListenPayload {
query: Query
relatedNode: Node
relatedNodeId: ID
}
type Subscription {
listen(topic: String!): ListenPayload!
}
PostGraphile's built in GraphiQL now supports subscriptions, so you can use it directly to test your application.
Topic prefix
Note: this section only applies to "simple subscriptions."
All topics requested from GraphQL are automatically prefixed with
postgraphile:
* to avoid leaking other topics your application may be using
- GraphQL consumers will not need to know about this, but you will need to
remember to add it to the topic when you perform
NOTIFY
otherwise subscribers will not see the messages.
* This is applied by default, but you can override it via
pgSubscriptionPrefix
setting in the graphileBuildOptions
object; e.g.
postgraphile(DATABASE_URL, SCHEMAS, {pluginHook, subscriptions: true, graphileBuildOptions: { pgSubscriptionPrefix: "MyPrefix:"}})
.
Note further that this setting only applies to simple subscriptions, custom
subscriptions have no automatic prefix.
For example a user may perform the following subscription:
subscription {
listen(topic: "hello") {
relatedNodeId
relatedNode {
nodeId
... on Foo {
id
title
}
}
}
}
To cause the subscription to receive a message, you could run the following in PostgreSQL:
select pg_notify(
'postgraphile:hello',
'{}'
);
Resulting in this GraphQL payload:
{
"data": {
"listen": {
"relatedNodeId": null,
"relatedNode": null
}
}
}
}
Which is sufficient to know that the event occurred. Chances are that you want to know more than this...
It's also possible to send a Node
along with your GraphQL payload using the
__node__
field on the pg_notify
body (which is interpreted as JSON). The
__node__
field is similar to the nodeId
(or id
if you use --classic-ids
)
field in your GraphQL requests, except it's the raw JSON before it gets
stringified and base64 encoded. (The reason for this is that Postgres' JSON
functions leave some optional spaces in, so when they are base64 encoded the
strings do not match.)
Assuming that you have a table of the form
foos(id serial primary key, title text, ...)
you can add the __node__
field
as follows and the record with id=32 will be made available as the relatedNode
in the GraphQL subscription payload:
select pg_notify(
'postgraphile:hello',
json_build_object(
'__node__', json_build_array(
'foos', -- IMPORTANT: this is not always exactly the table name; base64
-- decode an existing nodeId to see what it should be.
32 -- The primary key (for multiple keys, list them all).
)
)::text
);
Resulting in this GraphQL payload:
{
"data": {
"listen": {
"relatedNodeId": "WyJmb29zIiwzMl0=",
"relatedNode": {
"nodeId": "WyJmb29zIiwzMl0=",
"id": 32,
"title": "Howdy!"
}
}
}
}
NOTE: This solution is still taking shape, so it's not yet certain how other fields on the NOTIFY message JSON will be exposed via GraphQL. You are advised to treat the content of this message JSON as if it's visible to the user, as at some point it may be.
NOTE: In PostgreSQL the channel is an "identifier" which by default is limited to 63 characters. Subtracting the
postgraphile:
prefix leaves 50 characters for your topic name.
Subscription security
Note: this section only applies to "simple subscriptions."
By default, any user may subscribe to any topic, whether logged in or not, and they will remain subscribed until they close the connection themselves. This can cause a number of security issues; so we give you a method to implement security around subscriptions.
By specifying --subscription-authorization-function [fn]
on the PostGraphile
CLI (or using the subscriptionAuthorizationFunction
option) you can have
PostGraphile call the function you specified to ensure that the user is allowed
to subscribe to the relevant topic. The function must accept one text argument
topic
and must return a string or raise an exception (note: the topic
argument WILL be sent including the postgraphile:
prefix).
A typical implementation will look like this:
CREATE FUNCTION
app_hidden.validate_subscription(topic text)
RETURNS TEXT AS $$
BEGIN
IF ... THEN
RETURN ...::text;
ELSE
RAISE EXCEPTION 'Subscription denied'
USING errcode = '.....';
END IF;
END;
$$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
You must define this function with your custom security logic. To use this
function you'd pass the CLI flag:
--subscription-authorization-function app_private.validate_subscription
The text value returned is used to tell the system when to cancel the subscription - if you don't need this functionality then you may return a static unique value, e.g. generate a random UUID (manually) and then return this same UUID over and over from your function, e.g.:
CREATE FUNCTION app_hidden.validate_subscription(topic text)
RETURNS TEXT AS $$
BEGIN
IF ... THEN
RETURN '4A2D27CD-7E67-4585-9AD8-50AF17D98E0B'::text;
ELSE
RAISE EXCEPTION 'Subscription denied' USING errcode = '.....';
END IF;
END;
$$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
When a message is published to the topic identified by the return value of this
function (NOTE: this topic will NOT be prefixed with postgraphile:
because it
should be private), the associated subscription will automatically be
terminated.
Naming your topics
Note: this section only applies to "simple subscriptions."
You might want to make the topic a combination of things, for example the
subject type and identifier - e.g. 'channel:123'. If you do this then your
function could determine which subject the user is attempting to subscribe to,
check the user has access to that subject, and finally return a PostgreSQL topic
that will be published to in the event the user is kicked from the channel, e.g.
'channel:123:kick:987'
(assuming '987' is the id of the current user).
Example walk-through
Note: this section only applies to "simple subscriptions."
First, set up a .postgraphilerc.js
containing the following:
module.exports = {
options: {
plugins: ["@graphile/pg-pubsub"],
connection: "postgres:///subs",
schema: ["app_public"],
simpleSubscriptions: true,
},
};
Next, in terminal 1, run:
createdb subs || true
postgraphile
In terminal 2, connect to the subs DB using psql subs
and run the following:
create schema if not exists app_public;
create schema if not exists app_private;
create table if not exists app_public.foo (
id serial primary key,
title text not null
);
create or replace function
app_private.validate_subscription(topic text)
returns text as
$$
select 'CANCEL_ALL_SUBSCRIPTIONS'::text;
$$ language sql stable;
Then using a GraphQL client that supports subscriptions, such as GraphQL Playground, perform the following subscription:
subscription {
listen(topic: "hello") {
relatedNodeId
relatedNode {
nodeId
... on Foo {
id
title
}
}
}
}
You are not expecting an immediate result; first you have to trigger the event.
To do so, back in your psql
session in terminal 2, execute:
insert into app_public.foo (title) values ('Howdy!') returning *;
select pg_notify(
'postgraphile:hello',
json_build_object(
'__node__', json_build_array(
'foos',
(select max(id) from app_public.foo)
)
)::text
);
You should find that the event has been received by the client and the 'Howdy!' node has come through. You can run the above a few more times, or experiment a bit by changing the values if you like.
Finally to cancel the subscription, execute the following SQL:
select pg_notify(
'CANCEL_ALL_SUBSCRIPTIONS',
json_build_object()::text
);
You should notice that your client is no longer subscribed.
Bonus: the SQL commands in this walk-through can be automated with this handy bash script:
#!/bin/bash
set -e
createdb subs || true
psql -1X -v ON_ERROR_STOP=1 subs << HERE
create schema if not exists app_public;
create table if not exists app_public.foo (
id serial primary key,
title text not null
);
create schema if not exists app_private;
create or replace function app_private.validate_subscription(topic text)
returns text as \$\$
select 'CANCEL_ALL_SUBSCRIPTIONS'::text;
\$\$ language sql stable;
HERE
sleep 1
psql -1X -v ON_ERROR_STOP=1 subs << HERE
do \$\$
declare
v_foo app_public.foo;
begin
insert into app_public.foo (title) values ('Howdy!') returning * into v_foo;
perform pg_notify(
'postgraphile:hello',
json_build_object('__node__', json_build_array('foos', v_foo.id))::text
);
end;
\$\$ language plpgsql;
HERE
sleep 3
psql -1X -v ON_ERROR_STOP=1 subs << HERE
do \$\$
declare
v_foo app_public.foo;
begin
insert into app_public.foo (title) values ('Goodbye!') returning * into v_foo;
perform pg_notify(
'postgraphile:hello',
json_build_object('__node__', json_build_array('foos', v_foo.id))::text
);
perform pg_notify(
'CANCEL_ALL_SUBSCRIPTIONS',
json_build_object()::text
);
end;
\$\$ language plpgsql;
HERE
Advanced setup
If you need websockets to be listened for before your first HTTP request comes
in (most people don't need this) then you must create a rawHTTPServer
, mount
your express app
in it, and then add subscription support to the raw server
via the enhanceHttpServerWithSubscriptions
function, as shown below:
const {
postgraphile,
makePluginHook,
enhanceHttpServerWithSubscriptions,
} = require("postgraphile");
const { default: PgPubsub } = require("@graphile/pg-pubsub");
const { createServer } = require("http");
const express = require("express");
const pluginHook = makePluginHook([PgPubsub]);
const app = express();
const rawHTTPServer = createServer(app);
const postgraphileOptions = {
pluginHook,
simpleSubscriptions: true,
websocketMiddlewares: [
// Add whatever middleware you need here, note that
// they should only manipulate properties on req/res,
// they must not sent response data. e.g.:
//
// require('express-session')(),
// require('passport').initialize(),
// require('passport').session(),
],
};
const postgraphileMiddleware = postgraphile(
databaseUrl,
"app_public",
postgraphileOptions,
);
app.use(postgraphileMiddleware);
enhanceHttpServerWithSubscriptions(rawHTTPServer, postgraphileMiddleware);
rawHTTPServer.listen(parseInt(process.env.PORT, 10) || 3000);
The enhanceHttpServerWithSubscriptions
takes two arguments:
- the raw HTTP server from
require('http').createServer()
- the postgraphile middleware (this should be the same middleware that you mount into your Express app)