Reactive MongoDB Driver for Scala and Java 8
Reactive MongoDB Driver for Scala 2.11 and Java 8 built on top of Akka IO and Akka Streams.
Only MongoDB 2.6+ is supported.
Don't hesitate to ask questions in the Tepkin Google Group
Tepkin is a young but very active project and absolutely needs your help. Good ways to contribute include:
Please read our Scala Guide first: https://github.com/fehmicansaglam/tepkin/wiki/Scala-Guide
Latest stable Tepkin release is 0.1 and is available on Maven Central.
Scala developers, add the following dependency:
libraryDependencies ++= Seq(
"net.fehmicansaglam" %% "tepkin" % "0.1"
)
Java developers, net.fehmicansaglam.tepkin.api
package is intended to be used from Java. To use the package, add the following dependency to your pom.xml:
<dependency>
<groupId>net.fehmicansaglam</groupId>
<artifactId>tepkin-java_2.11</artifactId>
<version>0.1</version>
</dependency>
Or if you want to be on the bleeding edge using snapshots, latest snapshot release is 0.2-SNAPSHOT.
Scala developers, add the following repository and dependency:
resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
libraryDependencies ++= Seq(
"net.fehmicansaglam" %% "tepkin" % "0.2-SNAPSHOT"
)
Java developers add the following repository and dependency to your pom.xml:
<repository>
<id>Sonatype Snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
<dependency>
<groupId>net.fehmicansaglam</groupId>
<artifactId>tepkin-java_2.11</artifactId>
<version>0.2-SNAPSHOT</version>
</dependency>
To construct a Bson document, you can either create BsonElements and join them with ~
or create a document directly.
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
import net.fehmicansaglam.bson.element.BsonObjectId
import org.joda.time.DateTime
// Construct a BsonDocument from BsonElements
val element = "name" := "Johny"
val document = element ~
("surname" := "Doe") ~
("age" := 28) ~
("months" := $array(1, 2, 3))
// Construct a BsonDocument
val document = $document(
"_id" := BsonObjectId.generate,
"name" := "Johny",
"surname" := "Doe",
"age" := 28,
"months" := $array(1, 2, 3),
"details" := $document(
"salary" := 455.5,
"inventory" := $array("a", 3.5, 1L, true),
"birthday" := new DateTime(1987, 3, 5, 0, 0)
)
)
There is an implicit conversion from any BsonElement
to BsonDocument
for convenience.
import net.fehmicansaglam.bson.BsonDocument
import net.fehmicansaglam.bson.element.BsonElement
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
val element: BsonElement = "name" := "fehmi"
val document: BsonDocument = "name" := "fehmi"
To make a connection to MongoDB, use the MongoClient
interface.
import net.fehmicansaglam.tepkin.MongoClient
// Connect to a MongoDB node.
val client = MongoClient("mongodb://localhost")
MongoClient
manages multiple connection pools to MongoDB instances and therefore is a heavy class. Most of the time you will need only one MongoClient
instance per application.
Use MongoDatabase
and MongoCollection
in order to obtain a reference to a database and a collection.
// Obtain a reference to the "tepkin" database
val db = client("tepkin")
// Obtain a reference to the "example" collection in "tepkin" database.
val collection = db("example")
MongoDatabase
and MongoCollection
are lightweight classes and may be instantiated more than once if needed. However they are both immutable and reusable.
All methods in the MongoCollection
class need an implicit scala.concurrent.ExecutionContext
and an akka.util.Timeout
. You can define a default timeout and use the client's execution context as shown below:
import akka.util.Timeout
import scala.concurrent.duration._
// val client = ...
import client.ec
implicit val timeout: Timeout = 5.seconds
import net.fehmicansaglam.bson.BsonDocument
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
val query: BsonDocument = "name" := "fehmi"
val source = collection.find(query)
All find methods in Tepkin return an akka.stream.scaladsl.Source[List[BsonDocument], ActorRef]
. Then you can use any method in Akka Streams to process the returned stream.
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
val document = ("name" := "fehmi") ~ ("surname" := "saglam")
collection.insert(document)
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
val documents = (1 to 100).map(i => $document("name" := s"fehmi$i"))
collection.insert(documents)
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Source
import net.fehmicansaglam.bson.BsonDocument
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
import scala.collection.immutable.Iterable
implicit val mat = ActorFlowMaterializer()(client.context)
val documents: Source[List[BsonDocument], Unit] = Source {
Iterable.tabulate(100) { _ =>
(1 to 1000).map(i => $document("name" := s"fehmi$i")).toList
}
}
collection.insertFromSource(documents).runForeach(_ => ())
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
import scala.concurrent.Future
val document = ("name" := "fehmi") ~ ("surname" := "saglam")
val result: Future[UpdateResult] = for {
insert <- collection.insert(document)
update <- collection.update(
query = "name" := "fehmi",
update = $set("name" := "fehmi can")
)
} yield update
Update and return the old document.
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
collection.findAndUpdate(
query = Some("name" := "fehmi"),
update = $set("name" := "fehmi can")
)
Update and return the updated document.
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
collection.findAndUpdate(
query = Some("name" := "fehmi"),
update = $set("name" := "fehmi can"),
returnNew = true
)
import net.fehmicansaglam.bson.BsonDsl._
import net.fehmicansaglam.bson.Implicits._
import net.fehmicansaglam.tepkin.protocol.command.Index
collection.createIndexes(Index(name = "name_surname", key = ("name" := 1) ~ ("surname" := 1)))
import net.fehmicansaglam.tepkin.api.*;
MongoClient mongoClient = MongoClient.create("mongodb://localhost");
MongoCollection collection = mongoClient.db("tepkin").collection("test");
BsonDocument document = BsonDocumentBuilder.create().addString("name", "fehmi").build();
FiniteDuration timeout = Duration.create(5, TimeUnit.SECONDS);
CompletableFuture<Optional<BsonDocument>> cf = collection
.insert(document, mongoClient.ec(), timeout)
.thenCompose(insert -> collection.findOne(mongoClient.ec(), timeout));
Optional<BsonDocument> actual = cf.get(5, TimeUnit.SECONDS);