未加星标

Cassandra with Spark 2.0 : Building Rest API !

字体大小 | |
[数据库(综合) 所属分类 数据库(综合) | 发布者 店小二05 | 时间 2016 | 作者 红领巾 ] 0人收藏点击收藏

In this tutorial , we will be demonstrating how to make a REST service in Spark using Akka- http as a side-kick ;) and Cassandra as the data store.

We have seen the power of Spark earlier and when it is combined with Cassandra in a right way it becomes even more powerful. Earlier we have seen how to build Rest Api on Spark and Couchbase in this blog post, hence this will be about how to do the same thing in Cassandra.

So lets get started with the Code:

Your build.sbt should look like this :

name := "cassandra-spark-akka-http-starter-kit" version := "1.0" scalaVersion := "2.11.8" organization := "com.knoldus" val akkaV = "2.4.5" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.0", "org.apache.spark" % "spark-sql_2.11" % "2.0.0", "com.typesafe.akka" %% "akka-http-core" % akkaV, "com.typesafe.akka" %% "akka-http-experimental" % akkaV, "com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test", "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV, "org.scalatest" %% "scalatest" % "2.2.6" % "test", "com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3", "net.liftweb" % "lift-json_2.11" % "2.6.2" ) assembleArtifact in assemblyPackageScala := false // We don't need the Scala library, Spark already includes it assemblyMergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) } fork in run := true Database Access layer:

And your Database Access layer should look like this :

trait DatabaseAccess { import Context._ def create(user: User): Boolean = Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tableName)).toOption.isDefined def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tableName).where(s"id='$id'").collect()).toOption } object DatabaseAccess extends DatabaseAccess Service Layer:

Now your routing file should look like this :

package com.knoldus.routes import java.util.UUID import akka.actor.ActorSystem import akka.event.Logging import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{ExceptionHandler, Route} import akka.stream.ActorMaterializer import com.knoldus.domain.User import com.knoldus.factories.DatabaseAccess import net.liftweb.json._ import java.util.Date import net.liftweb.json.Extraction._ trait SparkService extends DatabaseAccess { implicit val system:ActorSystem implicit val materializer:ActorMaterializer val logger = Logging(system, getClass) implicit def myExceptionHandler = ExceptionHandler { case e: ArithmeticException => extractUri { uri => complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong")) } } implicit val formats: Formats = new DefaultFormats { outer => override val typeHintFieldName = "type" override val typeHints = ShortTypeHints(List(classOf[String], classOf[Date])) } val sparkRoutes: Route = { get { path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) => complete { val documentId = "user::" + UUID.randomUUID().toString try { val user = User(documentId,name,email) val isPersisted = create(user) if (isPersisted) { HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId") } else { HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId") } } catch { case ex: Throwable => logger.error(ex, ex.getMessage) HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId") } } } } ~ path("retrieve" / "id" / Segment) { (listOfIds: String) => get { complete { try { val idAsRDD: Option[Array[User]] = retrieve(listOfIds) idAsRDD match { case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headOption.fold("")(x => compact(render(decompose(x))))) case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong") } } catch { case ex: Throwable => logger.error(ex, ex.getMessage) HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $listOfIds") } } } } } }

This blog is in the continuation of building your rest services using Spark and Couchbase and here we just changed the datastore to Cassandra , and hence I did not explained each and every step. It contains just the simple implementation of REST API! If you want to know in detail please take a look here :

Scala, Couchbase, Spark and Akka-http: A combinatory tutorial forstarters

In future , we will be continuing the same thing using Neo4j too:stuck_out_tongue:.

So stay tuned !

You can find the code here on my github: shiv4nsh

If You have any questions you can contact me here or on Twitter: @shiv4nsh

I would be happy to help.

Till then

happy hAKKAing ! ! !


Cassandra with Spark 2.0 : Building Rest API !

本文数据库(综合)相关术语:系统安全软件

主题: SparkCassandraScalaRESTTwitter
分页:12
转载请注明
本文标题:Cassandra with Spark 2.0 : Building Rest API !
本站链接:http://www.codesec.net/view/482605.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 数据库(综合) | 评论(0) | 阅读(40)