An intro to writing native DB driver in Scala
Speaking directly to Cassandra over its native protocol using ZIO & Scodec
Most developers will sooner or later need to interact with some database from within the application. We then reach for an established Object Relational mapping or Functional Relational Mapping library of our choice.
Those are frameworks and libraries that help us interact with the database — handle mapping between objects and database’s tables (or graphs, depending on your database), help us with query constructions and so much more.
These usually depend on another layer — some driver — API, which is often provided by the database vendor. For example, for Postgres you could use its JDBC implementation. These drivers are fairly low level, and map capabilities of the database to the interface which is exposed to programmers. They also handle communication protocol — for example when you want to query the database “give me all users”, you are not interested in specific byte encoding of such query.
In this article, we’ll write a part of a simple driver that communicates with Cassandra, using non-blocking, asynchronous purely functional code, with the help of ZIO.
The bulk of the article will be about defining building blocks that we can use to build a full driver. We’ll build it up from the bottom-up, starting from defining our payloads and serialization, followed by sockets to send it across the wire. We’ll finish with an actual example that sends two real messages and receives responses.
A reasonable question is, why would you ever write such a database driver?
- you work for a database vendor (that is about to release a new fantastic database to the world) and need a driver :-)
- the existing driver doesn’t meet quality standards your codebase enforces — i.e. random exceptions, blocking operations all over the place, disregarding thread proper management, public mutating state, and so.
- your driver lacks capabilities that the underlying database supports, such as the capability to stream your data out, publish-subscribe to changes, and more.
- the existing driver doesn’t meet the paradigm your codebase enforces — i.e. you need a purely-functional, driver.
- education — you are interested in how things work or could work under the hood.
This does not only apply to database drivers. During your career, you may come across times when you need to interact with some system that does not expose some typical HTTP REST API (although this is nowadays less common) — such as some machinery, robots, etc. If you work for a large corporation, this could be some proprietary or in-house solution.
The writing of a driver can be a difficult, yet rewarding endeavor. At best, you have some specification at hand and at worst, you have to reverse engineer it based on sniffing packets.
I strongly recommend checking out the full code if you plan to follow along here.
If there is a single thing to take from the article it would be to not be afraid to do fairly low level stuff with Scala & FP. The language and its powerful libraries have your (and mine) back and they make the whole process ever so enjoyable. :-)
Setting up the stage
Topics that we’ll go through:
- how to serialize and deserialize your data from-to bytes using scodec
- how to use ZIO-NIO for low-level socket communication
- how to use ZIO to power all the above
- how to model your data types and structure code to cover the protocol
We will create a functioning vertical slice of us sending and receiving some messages from Cassandra using its native protocol. You can find the specification here. Don’t worry about reading it all — I’ll guide you through the important sections for our sample. We will develop the code in an extensible manner, that if you desire to cover more functionality, it should not be too difficult. I will link the spec directly to relevant sections as is needed.
We will need three dependencies — ZIO for functional programming, ZIO-NIO for sending bytes around the wire, and scodec so we can define how our case classes look when they are serialized into bytes. Let’s add them to our build.sbt.
Bits, bytes, and codecs
Cassandra envelope has various fields, such as a Version field, which takes 8 bytes, followed by Flags field, taking another 8 bytes, and so. At this time, we don’t know what is the meaning of version or flags. Looking at the envelope, we only see how many bits they take. These layouts are described in section 3. They are the smallest building blocks we can use, to eventually build up to mentioned envelope.
When the spec says “stream id is of type short”, we have to know precisely what is the layout of short — how many bytes does it occupy, what endian, etc… Let’s implement at least the most important ones from section 3
Create a protocol package with some codecs, using a fantastic scodec library. Codecs define how to encode and decode data into bytes. Scodecs gives us predefined lots of combinators on these codecs that will come in very handy as we implement more complex types.
Reading from the spec:
[int] A 4 bytes integer
[long] A 8 bytes integer
would correspond to the following codecs (codecs are defined in bits, and our spec defines types in bytes)
You can compose these codecs into larger codecs (as you would LEGO pieces). Let’s say you wanted to encode, using the above codecs, int followed by long:
We get back an Attempt — representing either the success or failure of the encoding operation. Decoding follows a similar pattern:
Our Attempt now contains the result of decoding — which is the actual decoded value and the bits that have remained and were not consumed.
Let’s define relevant types from the spec. My suggestion is, open up section 3 and compare the description of type against the following implementation:
Cassandra defines the full envelope as follows:
0 8 16 24 32 40
| version | flags | stream | opcode |
| length |
. ... body ... .
We have basic primitive types ready. We could now jump straight into implementing a codec for full message, but that wouldn’t be good enough. We’d have a type represented by something like byte (for version), byte (for flags), 2x byte (for stream)… We want to attach semantics to these bytes, based on the spec. This will help us as we will be working with these types on a higher level, attaching them semantics of protocol (for example — if you want to compress the payload, enable second bit in flags payload).
Let’s implement the envelope by parts. Don’t be scared by the number of sections or code that follows — once you understand a single part, you understand all of them! :-)
The following sections will be very data-oriented, but you know, they say, data rules the world. Though I don’t think they meant at most few bytes long envelopes ;-)
We’ll start with the parsing of an envelope header. That is everything except body, from the diagram above.
| version |
Version is the first real data type that is composed of multiple parts (each with its own semantics) despite that it’s only a single byte! According to the spec, the first bit controls the direction of the message, whether it’s a request or a response. The remaining 7 bits designated the version of the protocol. We want our codec to consume one bit and translate it to the direction, followed by 7 bits, translated into the version (this will fit into Byte). Let’s model it as a regular set of Scala classes, and define codec for each part:
If you haven’t encountered
xmap before, it's a
map & contramap together; we specify how to go from type A to type B, and how to go back from type B to A as well.
| flags |
Flags give various meanings to the envelope. Each flag can be turned on and off, based on how we flip the particular bit. Cassandra currently supports five flags that fit into a single byte — with following layout:
8 flags byte 16
x x x 1 1 1 1 1
| | | | ^- compression flag
| | | ^--- tracing flag
| | ^----- custom payload flag
| ^------- warning flag
^--------- use beta features flag
^ ^ ^ ---------- unused in current Cassandra
We’ll define FlagsPayload class to wrap it and give us a nice API to access flags from byte. Let’s also defined FlagType, that will give us typesafe enumeration of possible flags.
I don’t know about you, but I’d much rather match on FlagType than against bytes :-)
Disclaimer: this is a naming conflict with our favorite streams that represent a flow of data — it has nothing in common with it.
16 24 32
| stream |
Stream is a number. It is set by the client (most usually) and denotes an id of the message. The response (to this request) that you receive from the server will bear the same id. This allows you to pair requests with responses. This is important because the protocol is asynchronous and you can send multiple requests, and get responses in a different order.
Stream is always positive. A negative value of Stream can be emitted by the server in case of the client being subscribed to events (so not a traditional request-response). Since it takes 16bits, we derive that protocol supports ²¹⁵=32768 concurrent requests (one bit is reserved for sign).
| opcode |
OpCode (as in Operation Code), defines the meaning of the envelope. Every request is tagged by this code. We use it to distinguish the contents of the body — think about a body of authentication request — it surely differs to a body of query for contents of some Cassandra table.
We will define it only for some operations, but feel free to take a look at the spec and try to support all of them!
The new combinator might be
exmap, which is the same as
xmap that we had used before, only that it allows us to fail the operation. In our case, we fail if our byte doesn't have any case object representation. In other words - if we have received some byte that we don't know the meaning of.
0 8 16 24 32
| length |
. ... body ... .
Length is an integer, that denotes the length of the body.
These datatypes together create a header. Let’s combine them into a single case class. In one of the first sections I’ve shown you how to combine them using the symbol
It’s a bit clumsy, all those brackets such as in
case (((v, f), sid), op). If you were to combine even more codecs and you don't have LISP background, you probably would not enjoy it :-)
Let’s use shapeless to make it a bit nicer. You might be wondering where is the dependency coming from since we didn’t declare it. It’s coming from Scodec that plays with it nicely.
That’s nicer. Now we have everything to define a request and a response.
Requests and their encoding
Let’s start by defining a Frame. Every frame contains a header. The frame is a request (client initiates some operation) or response (server’s response to the operation).
This represents our basic hierarchy. We’ll define requests as FrameRequest descendants and responses under FrameResponse. We will want to encode requests into bits that Cassandra understands. Let’s create a typeclass that gives an encoding capability to our requests.
We can implement an encoder for any type that is a FrameRequest because we know the header of every such type. We encode it, then we encode the body, and the length (we need to encode the body because it denotes its length).
Let’s see define two requests, an OptionsRequest and a StartupRequest. The OptionsRequest can be used to find out supported versions of options (protocol version, compression, etc.). The StartupRequest will actually initialize the connection, and the server should respond with ReadyResponse.
OptionRequest doesn’t have any body, and therefore we define encoder that doesn’t encode any actual bits. Notice how our header defines that it’s
MessageDirection.Request as well as
OpCode.OpOptions. Those are very important, after all, we are sending OptionsRequest :-).
We move to a StartupRequest. StartupRequest has a body defined by the spec as StringMap, which we had implemented codec for in the previous section.
Responses and their decoding
Let’s define responses to our requests. For OptionsRequest, we get back a response with opcode OpSupported opcode. Let’s call it SupportedResponse. For StartupRequest, we receive response with OpReady opcode. We’ll call it, surprise — surprise, ReadyResponse. ReadyResponse does not contain anything besides its header (it has no body).
We don’t need to create instances of our RequestEncoder typeclass because these are responses and we will never be encoding them into bytes.
We still don’t know how to decode it from bytes though. We need to read the header to see the OpCode, and based on that we need to read only certain amount of bytes, matching the payload of the message defined by the OpCode. Let’s define the decoder based on data from the header:
Decoder for response to OpSupported’s body has to read
stringMultiMap. For OpReady there is no body, thus we create a decoder that directly provides a response with the header that we already have
Data over network
Now we have implemented all the required encoders and decoders, ranging from primitive types such as int, throughout data representing Flags, Stream and so, into a full envelope containing Header and Body. Let’s build pieces to use these to send data over the wire.
We will create two types of sockets. One to wrap low level Java socket into one that understands structures that our encoding library Scodec uses. The second one (built on top of the first one) encodes a request and decodes a response.
ZIO-NIO provides low level constructs around Java NIO that we will use to send data. It works with ByteBuffers. Scodec’s type of choice is BitVector. As we read the data, we need to do these conversions. We’ll separate these concerns because we want nice separation and clear layers.
We’ll create a socket address, which is an effect. We open the socket using the address, which is Managed type — it will take care of resource cleanup after we are done using it. The TaskManaged type that you see is essentially a managed type without any dependencies that can fail on Throwable. Less jargon and more code:
type TaskManaged[+A] = ZManaged[Any, Throwable, A] `
Let’s test this socket — just for fun. Open a local socket under port 9042. If you are on Linux, you can use
nc -l 9042 | hexdump -C. Let's send a number 11 over the wire.
We have received:
00 00 00 0b
0b represents 11 in hexadecimal representation. Our BitVectorSocket seems to be working fine (if you're a pessimist add "it works for number 11").
The final missing piece is a socket that can send FrameRequest and receive FrameResponse.
We can send anything that is a FrameRequest and we have typeclass instance of RequestEncoder for. Let’s implement it. It will use our BitVectorSocket underneath, therefore it will also need to be of type TaskManaged.
We prepare a method to help us convert Attempts to ZIO’s Tasks. If encoding or decoding fails, we want to fail the task. Sending a message is simple — we encode the data to BitVector, and write it to the socket.
For receiving, we read and decode the header, and bodyLength. Based on bodyLength, we read the following bits. Do you remember how the header contains OpCode that gives semantics to the body? We implemented a method that gives us the right decoder based on this header. We called it
def decodeHeader(header: Header): Decoder[FrameResponse]. We use it to obtain the required decoder which we use to read the body to get FrameResponse.
Let’s test it by sending a couple of messages.
Sending real requests to Cassandra
First, make sure you have Cassandra running with port 9042 exposed.
Let’s construct requests and send them:
After running it should see in console something like this (based on your Cassandra version):
Map(PROTOCOL_VERSIONS -> List(3/v3, 4/v4, 5/v5-beta),
COMPRESSION -> List(snappy, lz4),
CQL_VERSION -> List(3.4.4)))
Map(CQL_VERSION -> 3.0.0))
Congratulations — you have successfully sent a message to Cassandra that understood it, and sent you back a response! Isn’t it exciting?
This is where the real fun starts. We are able to talk to our database, and now it’s a matter of implementing further types, operations, requests, responses, and protocol logic.
Notice how when we receive a message, we receive a generic FrameResponse. We still lack actual protocol logic — for example — the OptionsRequest we always get back a SupportedResponse. This is not encoded anywhere. All we can do is send some FrameRequest, and receive some FrameResponse.
In our example, we used the driver synchronously — we sent a message, and expected a response. A good driver is able to handle this communication asynchronously (the protocol supports it) — you can have multiple requests in flight in parallel. The driver should pair them based on their Stream id.
- break the protocol (i.e. change expected sizes of payloads) and see what errors you get. Do you get any response from Cassandra?
- define remaining types from section 3
- implement missing OpCodes
- support additional requests and responses
- create a layer that understands protocol defined by the spec — i.e. after StartupRequest, we get back typed response of either ReadyResponse or an AuthenticateResponse, instead of a generic FrameResponse.
- add support for asynchronous communication — the ability to have multiple requests in flight
Maybe I will tackle some of these in one of my following posts.
We have defined our datatypes, opened a socket, and exchanged few messages with Cassandra. We have a strong foundation and layers that we can extend with additional operations and capabilities to better the driver.
As a closing remark, I would like to point out a kickass library that does something similar, but for Postgres — skunk. It was my initial motivation for this blog post, and I highly recommend you check it out.
I hope you enjoyed the article at least as much as I enjoyed writing it. If you have some questions feel free to get in touch at my Twitter @MarekKadek.