Short anatomy of node-redis client
I am trying to write a simple Redis client in Rust. As part of that exercise, I have read the source code of the Node.js Redis client. I want to jot down some notes on how it works.
A Redis client opens a TCP connection, and it writes and reads over that connection. We will examine these three things in detail. Opening a connection, writing a command, and reading a response.
Opening a connection
RedisClient
definition in this file.
class RedisClient {
// ....
constructor(options?: RedisClientOptions<M, F, S>) {
super();
this.#options = this.#initiateOptions(options);
this.#queue = this.#initiateQueue();
this.#socket = this.#initiateSocket();
// ....
}
// ....
}
In the constructor, we see that RedisClient
object has two private variables this.#queue
& this.#socket
that are relevant to us.
class RedisClient {
// ....
#initiateSocket(): RedisSocket {
const socketInitiator = async (): Promise<void> => {
const promises = [];
if (this.#selectedDB !== 0) {
promises.push(
this.#queue.addCommand(["SELECT", this.#selectedDB.toString()], {
asap: true,
}),
);
}
if (this.#options?.readonly) {
promises.push(
this.#queue.addCommand(COMMANDS.READONLY.transformArguments(), {
asap: true,
}),
);
}
if (this.#options?.name) {
promises.push(
this.#queue.addCommand(
COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name),
{ asap: true },
),
);
}
if (this.#options?.username || this.#options?.password) {
promises.push(
this.#queue.addCommand(
COMMANDS.AUTH.transformArguments({
username: this.#options.username,
password: this.#options.password ?? "",
}),
{ asap: true },
),
);
}
const resubscribePromise = this.#queue.resubscribe();
if (resubscribePromise) {
promises.push(resubscribePromise);
}
if (promises.length) {
this.#tick(true);
await Promise.all(promises);
}
};
return new RedisSocket(socketInitiator, this.#options?.socket)
.on("data", (chunk) => this.#queue.onReplyChunk(chunk))
.on("error", (err) => {
this.emit("error", err);
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
}
})
.on("connect", () => this.emit("connect"))
.on("ready", () => {
this.emit("ready");
this.#tick();
})
.on("reconnecting", () => this.emit("reconnecting"))
.on("drain", () => this.#tick())
.on("end", () => this.emit("end"));
}
// ....
}
initiateSocket
method returns a RedisSocket
object which is turn an EventEmitter
.
The method also pushes some commands onto this.#queue
.
RedisSocket
definition is in this file.
class RedisSocket {
// ...
async #connect(retries: number, hadError?: boolean): Promise<void> {
if (retries > 0 || hadError) {
this.emit("reconnecting");
}
try {
this.#isOpen = true;
this.#socket = await this.#createSocket();
this.#writableNeedDrain = false;
this.emit("connect");
try {
await this.#initiator();
} catch (err) {
this.#socket.destroy();
this.#socket = undefined;
throw err;
}
this.#isReady = true;
this.emit("ready");
} catch (err) {
this.emit("error", err);
const retryIn = (
this.#options?.reconnectStrategy ??
RedisSocket.#defaultReconnectStrategy
)(retries);
if (retryIn instanceof Error) {
this.#isOpen = false;
throw new ReconnectStrategyError(retryIn, err);
}
await promiseTimeout(retryIn);
return this.#connect(retries + 1);
}
}
#createSocket(): Promise<net.Socket | tls.TLSSocket> {
return new Promise((resolve, reject) => {
const { connectEvent, socket } = RedisSocket.#isTlsSocket(this.#options)
? this.#createTlsSocket()
: this.#createNetSocket();
if (this.#options.connectTimeout) {
socket.setTimeout(this.#options.connectTimeout, () =>
socket.destroy(new ConnectionTimeoutError()),
);
}
socket
.setNoDelay(this.#options.noDelay)
.once("error", reject)
.once(connectEvent, () => {
socket
.setTimeout(0)
// https://github.com/nodejs/node/issues/31663
.setKeepAlive(
this.#options.keepAlive !== false,
this.#options.keepAlive || 0,
)
.off("error", reject)
.once("error", (err: Error) => this.#onSocketError(err))
.once("close", (hadError) => {
if (!hadError && this.#isOpen && this.#socket === socket) {
this.#onSocketError(new SocketClosedUnexpectedlyError());
}
})
.on("drain", () => {
this.#writableNeedDrain = false;
this.emit("drain");
})
.on("data", (data) => this.emit("data", data));
resolve(socket);
});
});
}
#createNetSocket(): CreateSocketReturn<net.Socket> {
return {
connectEvent: "connect",
socket: net.connect(this.#options as net.NetConnectOpts), // TODO
};
}
// ...
}
The #createSocket
& #createNetSocket
method finally lead us to TCP connection logic we are looking for.
From Node.js documentation, net
module
The net module provides an asynchronous network API for creating stream-based TCP or IPC servers (net.createServer()) and clients (net.createConnection()).
Writing commands
Once the connection is established, we trigger several callbacks that we have setup so far.
The socket
connected is stored in this.#socket
and the this.#initiator
callback is called (socket.ts L108) which is passed
as the argument to be RedisSocket
constructor.
There are several commands queued from the initiator callback ([socket.ts L224-261]).
Let’s look at this.#queue
from RedisClient
. this.#queue
is an instance of RedisCommandQueue
.
RedisCommandQueue
definition is in this file.
RedisCommandQueue
is backed by two doubly-linked lists (yallist) #waitingToBeSent
and #waitingForReply
.
The names are self explanatory, one queue for keep commands yet to sent and the other for response yet to be recieved.
addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions): Promise<T> {
if (this.#pubSubState.isActive && !options?.ignorePubSubMode) {
return Promise.reject(new Error('Cannot send commands in PubSub mode'));
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
return Promise.reject(new Error('The queue is full'));
} else if (options?.signal?.aborted) {
return Promise.reject(new AbortError());
}
return new Promise((resolve, reject) => {
const node = new LinkedList.Node<CommandWaitingToBeSent>({
args,
chainId: options?.chainId,
returnBuffers: options?.returnBuffers,
resolve,
reject
});
if (options?.signal) {
const listener = () => {
this.#waitingToBeSent.removeNode(node);
node.value.reject(new AbortError());
};
node.value.abort = {
signal: options.signal,
listener
};
// AbortSignal type is incorrent
(options.signal as any).addEventListener('abort', listener, {
once: true
});
}
if (options?.asap) {
this.#waitingToBeSent.unshiftNode(node);
} else {
this.#waitingToBeSent.pushNode(node);
}
});
}
Every command enqueued returns a Promise so it’s caller can recieve the response asynchronously. The command is stored as an object along with the promise’s resolve and reject functions.
One thing to note the queues are using doubly-linked lists so commands if need be can be added to the front of the queue.
All the commands in the socket initiator pass an additional argument { asap: true }
.
The authentication command is the last command enqueued from socket initiator with { asap: true }
so it will be exectued first.
So far we queued commands, Let’s examine how they are sent.
RedisSocket.#socket
will emit a ready
event and RedisSocket
will in turn emit ready
event too.
In RedisClient.#initiateSocket
, RedisSocket
has a callback for ready
event.
return new RedisSocket(socketInitiator, this.#options?.socket)
.on("data", (chunk) => this.#queue.onReplyChunk(chunk))
.on("error", (err) => {
this.emit("error", err);
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
}
})
.on("connect", () => this.emit("connect"))
.on("ready", () => {
this.emit("ready");
this.#tick();
})
.on("reconnecting", () => this.emit("reconnecting"))
.on("drain", () => this.#tick())
.on("end", () => this.emit("end"));
Let’s examine RedisClient.#tick
method and related methods
#tick(force = false): void {
if (this.#socket.writableNeedDrain || (!force && !this.#socket.isReady)) {
return;
}
this.#socket.cork();
while (!this.#socket.writableNeedDrain) {
const args = this.#queue.getCommandToSend();
if (args === undefined) break;
this.#socket.writeCommand(args);
}
}
getCommandToSend(): RedisCommandArguments | undefined {
const toSend = this.#waitingToBeSent.shift();
if (!toSend) return;
let encoded: RedisCommandArguments;
try {
encoded = encodeCommand(toSend.args);
} catch (err) {
toSend.reject(err);
return;
}
this.#waitingForReply.push({
resolve: toSend.resolve,
reject: toSend.reject,
channelsCounter: toSend.channelsCounter,
returnBuffers: toSend.returnBuffers
});
this.#chainInExecution = toSend.chainId;
return encoded;
}
writeCommand(args: RedisCommandArguments): void {
if (!this.#socket) {
throw new ClientClosedError();
}
for (const toWrite of args) {
this.#writableNeedDrain = !this.#socket.write(toWrite);
}
}
this.#socket.cork()
is a rabbit hole on it’s own. This blog post explains it in a good detail.
RedisCommandQueue.getCommandToSend
method will pop the first command off the the queue and encode the command in RESP2 protocol.
RedisCommandQueue.#waitingForReply
queue is enqueued with node containing RedisCommandQueue.addCommand
return value promise’s
resolve
and reject
so they can be called once a response is recieved.
The encoded command is written to the socket in RedisSocket.writeCommand
with this.#socket.write
.
RedisSocket.#writableNeedDrain
tracks if the TCP socket’s buffer is full and stops RedisClient.#tick
from enqueueing more commands.
Returns true if the entire data was flushed successfully to the kernel buffer. Returns false if all or part of the data was queued in user memory. ‘drain’ will be emitted when the buffer is again free.
Is this handling backpressure? Yep. It seems so. Read this great blog post.
.on('drain', () => this.#tick())
RedisClient.#tick
will enqueue command until RedisSocket.#writableNeedDrain
is true.
RedisClient.#tick
will be called again when RedisSocket
will emit drain
.
Reading a response
return new RedisSocket(socketInitiator, this.#options?.socket).on(
"data",
(chunk) => this.#queue.onReplyChunk(chunk),
);
onReplyChunk(chunk: Buffer): void {
this.#decoder.write(chunk);
}
RedisSocket
will queue data coming from the TCP socket to RedisCommandQueue.onReplyChunk
method.
#decoder = new RESP2Decoder({
returnStringsAsBuffers: () => {
return (
!!this.#waitingForReply.head?.value.returnBuffers ||
this.#pubSubState.isActive
);
},
onReply: (reply) => {
if (this.#handlePubSubReply(reply)) {
return;
} else if (!this.#waitingForReply.length) {
throw new Error("Got an unexpected reply from Redis");
}
const { resolve, reject } = this.#waitingForReply.shift()!;
if (reply instanceof ErrorReply) {
reject(reply);
} else {
resolve(reply);
}
},
});
RESP2Decoder
will decode the response and call onReply
.
RedisCommandQueue.#waitingForReply
will pop a node so that it can finally settle to the Promise that was created in RedisCommandQueue.addCommand
.
What a journey! I feel great understanding how all this works.