Using ZIO Kafka with offset storage in Postgres for transactional processing

Combining Scala functional libraries for fun and productivity

Scala often gets a reputation for how complicated it is to write and setup and how much boilerplate there is. The same goes for functional programming. My goal is to show you the opposite — how even with limited knowledge of the following topics you can productively use competing libraries in the same field, and how well they fit together. How you can easily treat Kafka topics as streams of data, how simply you can compose SQL statements into a bigger transaction, and overall how productive you can be in today's Scala’s ecosystem.

This article relies on some very limited amount of knowledge of Scala, SBT, Kafka, Postgres, Docker, and Functional Programming. If I did my homework right, you shouldn’t feel intimidated at any point by any of these technologies. Before using anything, I’ll try to provide the briefest description that I think is sufficient for you — dear reader. Now if you know those technologies, go ahead and skip those brief descriptions, as I’ll be hugely simplifying :-)

The overall program is short, concise, and I believe no matter how experienced you are within these topics, you might discover something useful. Within the article you’ll find the following:

  • how to use ZIO Kafka to produce and consume messages.
  • how to serialize and deserialize our data to JSON
  • how to use Skunk a native, functional non-blocking Scala driver for Postgres to set up and query schema
  • how to manually commit Kafka offset partitions in Postgres in transaction
  • how nicely Cats and ZIO play together

Following a code on a blogpost may get a bit confusing, so I recommend checking out and following the runnable code that can be found in my Github project.

10.5.2021 Update

The following text has been updated to Cats Effect3. The previous code can be found here.

Setting up the stage

We have bank accounts. Each bank account has the owner’s name and current balance in dollars. Our account owners want to transfer money between themselves. Peter might want to send 300$ to me. I might want to send some money to my brother, Marius. We issue these commands to a bank, and once they are processed, our balance changes. If something goes wrong during processing, neither sender nor receiver observes any changes to their bank account, at any time.

To have some concrete example, let’s define how such a command could look: {"from": "Peter", "to": "Marek", amount: 300} - this would mean, that Peter is sending 300$ to Marek, and thus if this gets processed, Peter's balance should be -300$ (how unfortunate for Peter), while Marek's would be +300$. We could designate the type of this command, such as type:"TransferBalance", but since we support only single command we don't need to.

Once we issue the command, it is enqueued into Kafka. Kafka is distributed queue, at least for this example ;-). You have probably used Queue data structure before — think of Kafka’s Topic as such Queue, one that is externalized from your program, has more capabilities, scales much better, is resilient, etc.

Bank has an application that consumes these events from Kafka, processes them, and updates the Postgres database with new balances of each user. Postgres is a relational database — a place where we can store and manipulate our business data.

A money transfer will be two SQL statements — one to decrement the sender’s balance, another one to increment the receiver’s balance. We want these to happen in a single transaction — we don’t want to end up in a state where we removed money from one account, and never added it to the other account because our app crashed. Luckily Postgres (as pretty much every other relational database) supports transactions. So our users shouldn’t complain that money vanished from their account and was never received.

So if the processing of the balance transfer command crashes, we won’t execute the actual transfer. It’s good that users won’t observe any discrepancies in their accounts, but they will see that even though they issued the command to transfer funds, nothing had happened. We need to tell Kafka (our queue), that we aren’t done with this processing. Since we have these transactions (in our relational database), that designate the consistency of accounts, could we use them? Kafka stores internally what you have processed, under a number called offset. We can tell Kafka that we’ll store this offset ourselves — in Postgres — and we’ll update it in the same transaction that we update the users’ funds. Thus if anything crashes during processing of messages, the offset will stay unchanged (because the transaction will fail) and the messages will be processed again (once the service restarts or we fix the bug).

During this example, we’ll create a process that issues these commands (the user’s side, so to speak), and a process that consumes them (the bank side).

Preparations

We’ll need some containers running. Let’s start Kafka with Zookeeper and Postgres.

The overall app will be based on ZIO. ZIO is, first of all, a functional programming library for building asynchronous and concurrent programs, though it has a lot more goodies.

We’ll use SBT as our build tool — the most used build tool within the Scala ecosystem. Think of it as Maven or Gradle, but for Scala.

We now need the means to communicate with Kafka — to consume and produce messages. There is a Java client we could use. But that would mean we have to deal with unsafe impure code, and as I mentioned we’d like to use ZIO. Thus we reach for ZIO Kafka as our library of choice. Don’t be alarmed — underneath it uses the same Java library I had linked, but it wraps it nicely, exposing a functional-programming-friendly interface. Let’s add it to our build.sbt file:

It transitively pulls zio-core and zio-streams. Core is the main component that you need anytime you’re building ZIO app. ZIO Streams represents a stream of values that are pulled by some effectful computation — such as asking Kafka to give them to us :-). In my opinion, streams and stream-based applications are still very underused and underappreciated.

Let’s be a good citizen, and while we’re dealing with ZIO ecosystem, pull in the logging library.

We could integrate it with various backends, such as slf4j — but we don’t need any of that for this toy app, we’ll just use console. But should we need it, there are nice libraries already integrated.

Let’s serialize our data in JSON as we write it to Kafka because it’s such as common format after all. We’ll use Circe.

We added also macro support to compiler options, as we’ll be generating JSON codecs from case classes (as we are too lazy to write them by hand).

When it comes to dependencies, we covered the part that helps us integrating with Kafka, using ZIO, while using Circe for serialization (and deserialization) of messages.

How do we talk to Postgres? At this moment, there is no native ZIO driver we could use. We can pick any of the existing Java or Scala libraries, and wrap it with ZIO — it’s a bit of a hassle. On the other side, there is a native Postgres library called _Skunk. Skunk is based on Cats and fs2, another fantastic functional programming libraries, that are competing with ZIO.

Let’s add Skunk to our dependencies:

Now we have two competing libraries (ZIO and Cats). Let them become friends via an interoperability layer:

That’s it — we have everything we need. Let’s tie it all together.

Producer

Common functionality

Let’s create a producer that issues a command to transfer some money from one person to another every second.

We can define our command as follows:

There is only one possible command — TransferBalance. I encourage you to implement more commands (i.e. CloseAccount) and see how the compiler guides you. You get automatic JSON format derivation via@JsonCodec annotation. This is the data that will be stored in Kafka's topic. Let's give the topic a name, define some people, and means to select a random person.

Even though we have defined JSON representation for our, we need to tell Kafka about it. We’ll use Circe Printer to print it to String and store in Kafka. Kafka already has support for String serialization and deserialization (called Serde), that ZIO Kafka library conveniently wraps in zio.kafka.serde.Serde.string. We can inmap over it - essentially we tell how to go from String to JSON (for deserialization), and how to go from Json to String (for serialization).

The first function of inmapM (x => ZIO.fromEither(parser.parse(x).flatMap(_.as[Command]))) instructs how to read the String stored in Kafka. We parse it into JSON format ( parser.parse(x)) and we decode it into our Command (.flatMap(_.as[Command])). There can be errors in parsing to JSON (actual string is not JSON) and in decoding into our Command structure (the json we read does not match our Command), and thus we want to fail if that happens. This error is represented in Either, which we can lift into ZIO by ZIO.fromEither method.

The second function of inmapM (x => ZIO.succeed(x.asJson.printWith(commandSerdePrinter))) first encodes our Command into JSON x.asJson using Circe. Nothing can go wrong here - we can definitely encode every instance of Command into JSON. Second part (.printWith(commandSerdePrinter)) prints it to JSON. Again, no error can happen, and thus we lift it to successful ZIO.

Producer Implementation

Now we can implement the actual producer in 30ish lines of code.

We had defined some ProducerSettings, then created Kafka Producer Producer.make(producerSettings, Serde.int, CommandSerde) with our Serde. We generate command with some random values and append it to our topic p.produce(TopicName, event.hashCode(), event). Don't worry about the hashCode part - that is related to Kafka Key and is not relevant for this example. We want this program to run forever via(produce *> ZIO.sleep(1.second)).forever, producing values every second.

Because we added few logging statements, we need to provide the Logging layer (from ZIO-Logging library) using .provideCustomLayer(loggingLayer). here learn more about ZIO Layers.

That’s it — running this app will continuously send events into Kafka:

Consumer

SQL statements preparations

We want to store in our database two structures. Account balances and topic partition offsets. Let’s start by account balances:

Balance holds data about account owner name and owner’s current balance. We’ll update this balance based on commands we’ll process.

Let’s prepare DDL statement for the creation of a table to represent our Balance:

We use Skunk’s SQL interpolator followed by a command to create this into Command. So far we are not executing anything, we’re simply preparing all the SQL we need.

The balance will be updated during processing, so we need a statement that will insert the balance if it does not exist, or otherwise increment/decrement it based on the desired amount. We prepare parametrized query, that takes two arguments — name, and the amount that we want to update the balance with.

We also want to store topic partitions in our relational database, so let’s prepare the representation and SQL statements.

In our table, we can store TopicPartition as varchar, with a format of $topicname-$partition. Our DDL statement would be:

Let’s see how we could query these offsets for some partitions:

We create a parameterized query:

We are creating a query that takes some variable list of strings and returns a tuple of String and Int (the topic and offset from our database). Our case class topic consists of TopicPartition, not String, and not every String we get from the database may be representing a topic partition. Let’s parse them and pick only valid topic partitions.

The last missing piece is SQL to upsert partition offsets. If they are missing, we will insert them, otherwise, we’ll update the existing offset to the new one.

Consumer implementation

We have prepared SQL statements for creating offsets table, for querying for topic partitions data, and for upserting data.

Let’s set expectations for our consumer’s implementation. We expect to

  • establish a session to Postgres (so we can create tables and perform updates based on events)
  • use the SQL statements that we prepared in the previous section
  • subscribe to Kafka to consume events
  • process bunch of events, committing both Kafka offsets and changes to account balances in a single transaction
  • some code to integrate Skunk (based on Cats) and ZIO-Kafka.

Let’s start by defining our ZIO app, and preparing our Postgres session. We’ll represent it as ZIO Layer.

What’s going on here? We introduce a type alias type SessionTask = Session[Task] for clarity. It represents a Session that can execute statements within ZIO Task effect.

The Session.single call creates a Session, which comes from Skunk library. It is wrapped in Resource to ensure that we both acquire it and close it properly. But those are Cats construct. We convert it to ZIO equivalent - ZManaged via .toManagedZIO call. For that, we need the help of import zio.interop.catz._ as well as having implicit ZIO runtime in scope. We can get it from ZManaged.runtime, over which we flatMap to introduce it to the current scope ZManaged.runtime.flatMap { implicit r: Runtime[ZEnv] =>. Lastly, we convert the ZManaged to layer via toLayer call.

Don’t worry about import natchez.Trace.Implicits.noop too much, it's just our tracing instance.

Overall, the whole integration of ZIO and Cats took a single import and bringing in implicit Runtime into scope. That’s pretty simple.

Let’s define our program. We want to create a Kafka consumer with manual offset retrieval (from Postgres).

We again introduce implicit runtime into scope, as we’re working with Cats based Session. We specify manual offset retrieval Consumer.OffsetRetrieval.Manual { partitions =>. In it, we specify, how to fetch offsets for a set of partitions. We first need to convert our partitions into a string representation that we use in the database ($topic-$partition) via partitions.map(partitionToString).toList.

We prepare the query (that we had created in the previous section) for selecting offsets session.prepare(query).toManagedZIO and use it (with our list of partitions) to stream all results.use(_.stream(list, 64).compile.toVector). Our query for fetching offsets for many partitions may return many offsets, and thus having to think in terms of streams helps us not forget about it. Not every query you may run against Postgres may have nice characteristics of returning only a few rows.

If you remember well in the previous section, we mentioned that not everything stored in the database may be a valid TopicPartition string representation. We handle this encoding in our SQL definition, and thus now we only need to collect the ones that are valid .map(xs => xs.collect { case Some(t) => t }. We convert it to a structure that Consumer.OffsetRetrieval.Manual expects, which is a map of String (PartitionTopic) to Long (Offset) via .map(x => x.topic -> x.offset).toMap.

In the last step, we create Consumer based on settings (for manual retrieval) to connect to our local Kafka instance.

So far we’ve just been preparing things, such as SQL statements, Kafka Consumer, Postgres Session, but nothing really had happened. Let’s create the tables and actually consume the Kafka Topic to process events.

Building on previous code:

We use Postgres Session to execute DDL statements to setup our database, and subscribe to Kafka topic consumer.subscribeAnd(Subscription.topics(TopicName)). This is the same topic Subscription.topics(TopicName) that we produce messages into, and we use same serializer-deserializer CommandSerde.

We tap into the stream and log every message using.tap(x => log.info(x.value.toString)), and we run the stream to exhaustion - which will never happen, as the stream of events from Kafka is infinite. At any time, the producer can append a new event to it.

Let’s add another operation to the stream, where we actually process the messages and offset management in a single transaction. This is the beefy part — and it’s way simpler than it looks.

We want to process the stream by chunks of Kafka records (think of it as pulling a bunch of messages at once in efficient manner), via .foreachChunk { chunk =>. For every chunk's offset, we create OffsetBatch - instance that merges offsets into a single data structure via OffsetBatch(chunk.map(_.offset)).

We create a list of these offsets for our offset update command offsets.map{ case (topicPartition, offset) => ... - it's the argument that we need for Sql.updatePartitionCommand.

Mapping over chunk we can get a chunk of actual commands — the commands our producer has pushed to Kafka — chunk.map(_.value).

We open and use a transaction session.transaction.toManagedZIO.use { transaction =>. Inside a transaction, we want to make two SQL statements for every command - one to withdraw funds from the sender's account, and another one to deposit the receiver's funds. For each command that we have (in our chunk), we want to create SQL statements them using ZIO.foreach_(commands) {.

Executing a statement is a matter of taking the SQL we wrote, preparing it, and feeding it the values from the command that we are processing.

After every statement is executed within the transaction, we also append committing Kafka offset statement, and we commit the transaction:

That’s it — that’s our main zio computation. We feed it our Logging and Session layer that we and run it:

If we run the producer and consumer we should see that messages are being both produced and consumed. If we check our Postgres instance, we can find our offsets (yours will differ):

Checking the balances

Because everyone started with 0$, and we handle every transfer in the transaction, at any given time the sum of these accounts should equal to 0 — even if you introduce random failures into the pipeline.

Next steps

If you want to have some more fun or test it more, try implementing the following:

  • inject random failures into the computation
  • support more commands
  • disallow having a negative balance
  • in case of error, commit offset, but enqueue the message again at the end of your Kafka Topic (maybe store how many times this operation was tried)

Conclusion

We successfully used libraries from competing ecosystems to play together with just a few lines. We were able to stay at all times within pure functional Scala, composing simple programs into bigger, more complex ones.

We saw two kinds of streams — the first one (fs2, Cats-based) from Skunk library when querying a table for entries (possibly millions of rows should the use case of the table be different). The second one ( ZIO Stream) was representing our subscription to Kafka Topic, to which our producer pushed events. Our consumer program processed them within chunks, transformed them into a single transaction, and committed them together with offsets of our Kafka Topic.

Both effect and streaming libraries played together very nicely and allowed us to focus on the domain problem, which shows the length to which Scala library authors are willing to go for good ergonomics for end users.