View on GitHub

Reactive Couchbase

A reactive Scala driver for Couchbase

About ReactiveCouchbase

ReactiveCouchbase is a scala driver that provides non-blocking and asynchronous I/O operations on top of Couchbase. ReactiveCouchbase is designed to avoid blocking on each database operations. Every operation returns immediately, using the elegant Scala Future API to resume execution when it's over. With this driver, accessing the database is not an issue for performance anymore. ReactiveCouchbase is also highly focused on streaming data in and out from your Couchbase servers using the very nice ReactiveStreams on top of Akka Streams.

Work in progress

ReactiveCouchbase v2 (ReactiveStreams edition) is currently under heavy development. There is no binary available yet, but you can build the driver on your machine to try it :

git clone
cd reactivecouchbase-rs-core
sbt ';clean;compile;publish-local'

then in your project add the dependency

libraryDependencies += "org.reactivecouchbase" % "reactivecouchbase-core" % "2.0.0-SNAPSHOT"

If you encounter any issue during build or usage, just let me know.

Simple example

import com.typesafe.config.ConfigFactory
import org.reactivecouchbase.scaladsl.{N1qlQuery, ReactiveCouchbase}
import play.api.libs.json.Json
import com.typesafe.config.ConfigFactory

object ReactiveCouchbaseTest extends App {

  val system = ActorSystem("ReactiveCouchbaseSystem")

  implicit val materializer = ActorMaterializer.create(system)
  implicit val ec = system.dispatcher

  val driver = ReactiveCouchbase(ConfigFactory.parseString(
      |buckets {
      |  default {
      |    name = "default"
      |    hosts = [""]
      |  }
    """.stripMargin), system)

  val bucket = driver.bucket("default")

  val future = for {
    _        <- bucket.insert("key1", 
                  Json.obj("message" -> "Hello World", "type" -> "doc"))
    doc      <- bucket.get("key1")
    exists   <- bucket.exists("key1")
    docs     <-
                    N1qlQuery("select message from default where type = $type")
                  .on(Json.obj("type" -> "doc")))
    messages <-
                    N1qlQuery("select message from default where type = 'doc'"))
         => (doc \ "message").as[String].toUpperCase)
    _        <- driver.terminate()
  } yield (doc, exists, docs) {
    case (_, _, docs) => println(s"found $docs")


What about a PlayFramework plugin ?

I don't think you actually need a plugin, if you want to use it from Play Framework, you can define a service to access your buckets like the following :

import javax.inject._
import play.api.inject.ApplicationLifecycle
import play.api.Configuration
import org.reactivecouchbase.scaladsl._

class Couchbase @Inject()(configuration: Configuration, 
      lifecycle: ApplicationLifecycle) {

  private val driver = 

  def bucket(name: String): Bucket = driver.bucket(name)

  lifecycle.addStopHook { () =>

so you can define a controller like the following

import javax.inject._
import scala.concurrent.ExecutionContext
import play.api.mvc._
import play.api.libs.json._

class ApiController @Inject()(couchbase: Couchbase)
    (implicit ec: ExecutionContext, materializer: Materializer) extends Controller {

  def eventsBucket = couchbase.bucket("events")

  def events(filter: Option[String] = None) = Action {
    val source = eventsBucket
        "select id, payload, date, params, type from events where type = $type"
      .on(Json.obj("type" -> filter.getOrElse("doc")))
      .intersperse("[", ",", "]")


The core of ReactiveCouchbase is available on Gihtub and depends on Play JSON library and Akka Streams