Getting started
Sage is a native Redis and Valkey client for Scala 3. There is no Java client wrapped underneath: the RESP3 protocol, the commands, and the codecs are implemented directly in Scala, on a zero-dependency, effect-free core.
That core is paired with a runtime written once and cross-published for Ox, ZIO, Cats Effect, and Kyo, so you use sage with your ecosystem's native types and no wrapper in sight. It targets RESP3 and modern Redis 8+ / Valkey 8+, runs on Scala 3.3.x LTS and later, and requires JDK 21+.
Installation
Add the artifact for your effect system. The core is pulled in transitively, so you depend on one module only.
"com.github.ghostdogpr" %% "sage-client-ox" % "0.1.0""com.github.ghostdogpr" %% "sage-client-zio" % "0.1.0""com.github.ghostdogpr" %% "sage-client-ce" % "0.1.0""com.github.ghostdogpr" %% "sage-client-kyo" % "0.1.0"Two imports cover everything: import sage.* for the command vocabulary and connection config, and import sage.backend.* for the client. That second import is the same regardless of effect system; only the dependency you choose differs.
Your first connection
A SageClient owns all connections to one server or cluster. You build it from a SageConfig using your ecosystem's idiomatic construction form: a scoped resource for Ox and Kyo, a ZLayer for ZIO, and a Resource for Cats Effect. The command surface is identical across all four; only this wiring differs.
import ox.supervised
import sage.*
import sage.backend.*
@main def main(): Unit =
supervised {
val config = SageConfig(
topology = Topology.Standalone(Endpoint("localhost", 6379))
)
val client = SageClient.scoped(config)
client.set("greeting", "hello")
val greeting = client.get[String]("greeting")
println(s"greeting=$greeting") // Some("hello")
}import zio.*
import sage.*
import sage.backend.*
object Main extends ZIOAppDefault {
val config = SageConfig(
topology = Topology.Standalone(Endpoint("localhost", 6379))
)
def run =
ZIO.serviceWithZIO[SageClient] { client =>
for {
_ <- client.set("greeting", "hello")
greeting <- client.get[String]("greeting")
} yield greeting
}.provide(SageClient.layer(config))
}import cats.effect.{IO, IOApp}
import sage.*
import sage.backend.*
object Main extends IOApp.Simple {
val config = SageConfig(
topology = Topology.Standalone(Endpoint("localhost", 6379))
)
def run: IO[Unit] =
SageClient.resource(config).use { client =>
for {
_ <- client.set("greeting", "hello")
greeting <- client.get[String]("greeting")
} yield ()
}
}import kyo.*
import sage.*
import sage.backend.*
object Main extends KyoApp {
val config = SageConfig(
topology = Topology.Standalone(Endpoint("localhost", 6379))
)
run {
Scope.run {
for {
client <- SageClient.scoped(config)
_ <- client.set("greeting", "hello")
greeting <- client.get[String]("greeting")
} yield greeting
}
}
}How it works
A SageClient is not a single connection, and ordinary commands do not borrow from a pool. They are written to one multiplexed connection per node, shared by every fiber: sage auto-pipelines them, coalescing concurrent commands into fewer socket writes and matching replies back in FIFO order. This is transparent (you never assemble a pipeline to get it), and it is what keeps throughput high under concurrency.
Two cases step off that connection automatically. Commands that hold per-connection state or block (WATCH/MULTI/EXEC, BLPOP, and the like) lease a dedicated connection from an on-demand pool for the duration. Pub/sub subscriptions share a separate subscription connection, created the first time you subscribe, so a slow consumer can never stall command replies.
A short tour
The snippets below show the same operations on each backend: pick your tab. In Ox they return values directly; in ZIO, Cats Effect, and Kyo they are steps in a for-comprehension over that ecosystem's effect type. Each assumes a client in scope and the usual imports for your effect type (plus import scala.concurrent.duration.* where a duration appears).
Commands
Methods are named one-for-one with Redis commands, grouped by family (strings, hashes, lists, sets, sorted sets, and so on). Keys and values are typed: the key type is fixed on the client (String by default), so a read only names its value type. Any type with a ValueCodec (here, a User) rides over the wire the same way as a String.
client.set("greeting", "hello")
val greeting = client.get[String]("greeting") // Some("hello")
client.incrBy("counter", 10)
client.hSet("user:1", ("name", "Ada"), ("age", "36"))
val profile = client.hGetAll[String, String]("user:1")
// Map("name" -> "Ada", "age" -> "36")
client.set("user:ada", User("Ada", 36))
val ada = client.get[User]("user:ada") // Some(User("Ada", 36))for {
_ <- client.set("greeting", "hello")
greeting <- client.get[String]("greeting")
_ <- client.incrBy("counter", 10)
_ <- client.hSet("user:1", ("name", "Ada"), ("age", "36"))
profile <- client.hGetAll[String, String]("user:1")
_ <- client.set("user:ada", User("Ada", 36))
ada <- client.get[User]("user:ada")
} yield (greeting, profile, ada)See Commands & codecs for the full vocabulary and how to write a codec for your own types.
Pipelines and transactions
Compose commands into a pipeline to send them in one round-trip and get back a typed tuple of results:
client.set("pipe:a", "x")
client.set("pipe:n", 10)
val tuple = client.pipeline(
(
Commands.get[String, String]("pipe:a"),
Commands.incrBy("pipe:n", 5)
)
)for {
_ <- client.set("pipe:a", "x")
_ <- client.set("pipe:n", 10)
tuple <- client.pipeline(
(
Commands.get[String, String]("pipe:a"),
Commands.incrBy("pipe:n", 5)
)
)
} yield tupleA transaction runs a pipeline atomically via MULTI/EXEC, optionally guarded by WATCH for optimistic concurrency. A None result means a watched key changed before EXEC, the normal signal to retry:
client.set("tx:n", 1)
val result = client.transaction { tx =>
tx.watch("tx:n")
tx.get[Int]("tx:n")
tx.exec(
(Commands.incr("tx:n"), Commands.incrBy("tx:n", 4))
)
}for {
_ <- client.set("tx:n", 1)
result <- client.transaction { tx =>
for {
_ <- tx.watch("tx:n")
_ <- tx.get[Int]("tx:n")
res <- tx.exec(
(
Commands.incr("tx:n"),
Commands.incrBy("tx:n", 4)
)
)
} yield res
}
} yield resultThe distinction is covered in Pipelines & transactions.
Pub/Sub
Subscribing yields a stream of messages in your ecosystem's native stream type: an Ox Flow, a ZIO ZStream, an fs2 Stream, or a Kyo Stream. Ending the stream, or closing its scope, unsubscribes.
Because publishing here happens right after subscribing, these examples use the variant that returns only once the server has confirmed the subscription, so the publish can't outrun the registration: subscribeScoped on ZIO and Kyo, subscribeResource on Cats Effect, and plain subscribe on Ox (where the call is already synchronous). The plain stream-returning subscribe registers lazily on first pull and is the right choice for a long-lived consumer that isn't racing its own publisher.
val news = client.subscribe[String]("news")
(1 to 3).foreach(i => client.publish("news", s"item-$i"))
val messages = news.take(3).runToList()ZIO.scoped {
for {
stream <- client.subscribeScoped[String]("news")
_ <- ZIO.foreachDiscard(1 to 3) { i =>
client.publish("news", s"item-$i")
}
messages <- stream.take(3).runCollect
} yield messages.map(_.payload).toList
}client.subscribeResource[String]("news").use { stream =>
for {
_ <- (1 to 3).toList.traverse_ { i =>
client.publish("news", s"item-$i")
}
messages <- stream.take(3).compile.toVector
} yield messages.map(_.payload).toList
}for {
stream <- client.subscribeScoped[String]("news")
_ <- Kyo.foreachDiscard(1 to 3) { i =>
client.publish("news", s"item-$i")
}
chunk <- stream.take(3).run
} yield chunk.toList.map(_.payload)Classic and sharded pub/sub are both covered in Pub/Sub.
Cached reads
Opt a read into client-side caching per call. The first read fetches and caches; the second is served locally until a server invalidation or the TTL evicts it.
client.set("cached:key", "v1")
// first fetches and caches; second is a local hit
val v1 = client.cached(Commands.get[String, String]("cached:key"), 1.minute)
val v2 = client.cached(Commands.get[String, String]("cached:key"), 1.minute)for {
_ <- client.set("cached:key", "v1")
v1 <- client.cached(Commands.get[String, String]("cached:key"), 1.minute)
v2 <- client.cached(Commands.get[String, String]("cached:key"), 1.minute)
} yield (v1, v2)More in Client-side caching.
Next steps
- Commands & codecs for the command surface and typing your own values
- Pipelines & transactions for batching and atomicity
- Pub/Sub for classic and sharded messaging
- Streams for append-only logs and consumer groups
- Client-side caching for cached reads and invalidation
- Configuration for cluster, master-replica, read routing, and TLS
- Error handling and Observability