View on GitHub

Reactive Couchbase

A reactive Scala driver for Couchbase

About ReactiveCouchbase

ReactiveCouchbase is a scala driver that provides non-blocking and asynchronous I/O operations on top of Couchbase. ReactiveCouchbase is designed to avoid blocking on each database operations. Every operation returns immediately, using the elegant Scala Future API to resume execution when it’s over. With this driver, accessing the database is not an issue for performance anymore. ReactiveCouchbase is also highly focused on streaming data in and out from your Couchbase servers using the very nice ReactiveStreams on top of Akka Streams.

Work in progress

ReactiveCouchbase RS (ReactiveStreams edition) is currently under heavy development. If you want to try it, add a resolver to your build.sbt file

resolvers += "reactivecouchbase-rs-snapshots" at "https://raw.github.com/ReactiveCouchbase/reactivecouchbase-rs-core/master/repository/snapshots"

resolvers += "reactivecouchbase-rs-releases" at "https://raw.github.com/ReactiveCouchbase/reactivecouchbase-rs-core/master/repository/releases"

or you can build it to get the nice goodies

git clone https://github.com/ReactiveCouchbase/reactivecouchbase-rs-core.git
cd reactivecouchbase-rs-core
sbt ';clean;compile;publish-local'

then in your project add the following dependency

libraryDependencies += "org.reactivecouchbase" %% "reactivecouchbase-rs-core" % "1.0.0"

or the latest snpashot

libraryDependencies += "org.reactivecouchbase" %% "reactivecouchbase-rs-core" % "1.0.1-SNAPSHOT"

and you’re ready to go

If you encounter any issue during build or usage, just let me know.

Simple example

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.reactivecouchbase.rs.scaladsl.{N1qlQuery, ReactiveCouchbase}
import org.reactivecouchbase.rs.scaladsl.json._
import play.api.libs.json._
import akka.stream.scaladsl.Sink
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory

object ReactiveCouchbaseTest extends App {

  val system = ActorSystem("ReactiveCouchbaseSystem")
  
  implicit val materializer = ActorMaterializer.create(system)
  implicit val ec = system.dispatcher

  val driver = ReactiveCouchbase(ConfigFactory.parseString(
    """
      |buckets {
      |  default {
      |    name = "default"
      |    hosts = ["127.0.0.1"]
      |  }
      |}
    """.stripMargin))

  val bucket = driver.bucket("default")

  val future = for {
    _        <- bucket.insert[JsValue]("key1", 
                  Json.obj("message" -> "Hello World", "type" -> "doc"))
    doc      <- bucket.get("key1")
    exists   <- bucket.exists("key1")
    docs     <- bucket.search(
                    N1qlQuery("select message from default where type = $type")
                  .on(Json.obj("type" -> "doc")).asQueryParams)
                  .asSeq
    messages <- bucket.search(
                    N1qlQuery("select message from default where type = 'doc'"))
                  .asSource.map(doc => (doc \ "message").as[String].toUpperCase)
                  .runWith(Sink.seq[String])
    _        <- driver.terminate()
  } yield (doc, exists, docs)

  future.map {
    case (_, _, docs) => println(s"found $docs")
  }

}

What about a PlayFramework plugin ?

I don’t think you actually need a plugin, if you want to use it from Play Framework, you can define a service to access your buckets like the following :

import javax.inject._
import play.api.inject.ApplicationLifecycle
import play.api.Configuration
import org.reactivecouchbase.rs.scaladsl._

@Singleton
class Couchbase @Inject()(configuration: Configuration, 
      lifecycle: ApplicationLifecycle) {

  private val driver = 
      ReactiveCouchbase(configuration.underlying.getConfig("reactivecouchbase"))

  def bucket(name: String): Bucket = driver.bucket(name)

  lifecycle.addStopHook { () =>
    driver.terminate()
  }
}

so you can define a controller like the following

import javax.inject._
import scala.concurrent.ExecutionContext
import org.reactivecouchbase.rs.scaladsl.json._
import play.api.mvc._
import akka.stream.Materializer
import play.api.libs.json._

@Singleton
class ApiController @Inject()(couchbase: Couchbase)
    (implicit ec: ExecutionContext, materializer: Materializer) extends Controller {

  def eventsBucket = couchbase.bucket("events")

  def events(filter: Option[String] = None) = Action {
    val source = eventsBucket
      .search(N1qlQuery(
        "select id, payload, date, params, type from events where type = $type"
      )
      .on(Json.obj("type" -> filter.getOrElse("doc")).asQueryParams)
      .asSource
      .map(Json.stringify)
      .intersperse("[", ",", "]")
    Ok.chunked(source).as("application/json")
  }
}

What if I want to use a JSON lib other than Play Json ?

you can easily do that, actually everything linked to Play Json is imported from

import org.reactivecouchbase.rs.scaladsl.json._

then you just have to reimplement a few things

import akka.util.ByteString
import com.couchbase.client.java.document.json.JsonObject
import org.reactivecouchbase.rs.scaladsl.json.{JsonReads, JsonWrites, JsonSuccess, QueryParams}
import foo.bar.jsonlib.{JsonNode, JsonObj}

val read: JsonReads[JsonNode] = JsonReads(bs => JsonSuccess(JsonNode.parse(bs.utf8String)))
val write: JsonWrites[JsonNode] = JsonWrites(jsv => ByteString(JsonNode.stringify(jsv)))

implicit val defaultByteStringFormat: JsonFormat[JsonNode] = JsonFormat(read, write)

implicit val defaultByteStringConverter: CouchbaseJsonDocConverter[JsonNode] = new CouchbaseJsonDocConverter[JsonNode] {
  override def convert(ref: AnyRef): JsonNode = ...
}

case class JsonObjQueryParams(query: JsonObj = ByteString.empty) extends QueryParams {
  override def isEmpty: Boolean = !query.hasValue
  override def toJsonObject: JsonObject = ...
}

You have a few examples at

Projects

The core of ReactiveCouchbase RS is available on Gihtub and depends on Play Json library and Akka Streams

Community