Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/main/scala/org/splink/cpipe/CPipe.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.splink.cpipe

import org.splink.cpipe.processors.{Exporter, Exporter2, Importer}
import org.splink.cpipe.config.{Config, Arguments}
import org.splink.cpipe.processors.{Exporter, Exporter2, Importer, Importer2}
import org.splink.cpipe.config.{Arguments, Config}

import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}

Expand All @@ -23,6 +24,8 @@ object CPipe {
config.mode match {
case "import" =>
new Importer().process(session, config)
case "import2" =>
new Importer2().process(session, config)
case "export" =>
new Exporter().process(session, config)
case "export2" =>
Expand Down
51 changes: 47 additions & 4 deletions src/main/scala/org/splink/cpipe/JsonColumnParser.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
package org.splink.cpipe

import com.datastax.driver.core.{DataType, Row}
import java.lang.{Double, Boolean}
import java.util.Date

import com.datastax.driver.core.{BatchStatement, DataType, PreparedStatement, Row, Session}
import play.api.libs.json._

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

object JsonColumnParser {

case class Column(name: String, value: String, typ: DataType)
case class Column(name: String, value: Object, typ: DataType)

private val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")

def column2Json(column: Column) = {
val sanitized = stripControlChars(column.value)
val sanitized: String = column.value match {
case date: Date => dateFormat.format(date)
case _ => stripControlChars(column.value.toString)
}

Try(Json.parse(sanitized)) match {
case Success(json) =>
val r = json match {
Expand All @@ -28,7 +37,7 @@ object JsonColumnParser {

def row2Json(row: Row) =
row.getColumnDefinitions.iterator.asScala.flatMap { definition =>
Try(row.getObject(definition.getName).toString) match {
Try(row.getObject(definition.getName)) match {
case Success(value) =>
column2Json {
Column(definition.getName, value, definition.getType)
Expand Down Expand Up @@ -73,6 +82,40 @@ object JsonColumnParser {
}


def json2PreparedStatement(table: String, json: JsObject, session: Session): PreparedStatement = {
val str = s"INSERT INTO $table ( ${json.fields.map(_._1).mkString(", ")} ) VALUES ( ${json.fields.map(_ => "?").mkString(", ")} );"
session.prepare(str)
}

def getStringToObjectMappingForTable(session: Session, table: String): Map[String, String => Object] = {
val queryResult = session.execute(s"select * from $table limit 1")
queryResult.getColumnDefinitions.asScala.map{
definition => definition.getName -> getStringToObjectConversionMethod(definition.getType)
}.toMap
}

def getStringToObjectConversionMethod(dataType: DataType): String => Object = (s: String) => {
dataType.getName match {
case DataType.Name.DATE => dateFormat.parse(s)
case DataType.Name.TIMESTAMP => dateFormat.parse(s)
case DataType.Name.DOUBLE => new Double(s.toDouble)
case DataType.Name.INT => new Integer(s.toInt)
case DataType.Name.VARCHAR => s
case DataType.Name.BOOLEAN => new Boolean(s == "true")
case _ => throw new IllegalArgumentException(s"Please add a mapping for the '${dataType.getName}' type")
}
}

def jsValueToScalaObject(name: String, jsValue: JsValue, objectMapping: Map[String, String => Object]) : Object = {
val v = jsValue.toString.stripPrefix("\"").stripSuffix("\"")
objectMapping.get(name).getOrElse(throw new IllegalArgumentException(s"$name was not found in the map $objectMapping"))(v)
}

def addJsonToBatch(json: JsObject, preparedStatement: PreparedStatement, batch: BatchStatement, objectMapping: Map[String, String => Object]): Unit = {
val values = json.fields.map { v => jsValueToScalaObject(v._1, v._2, objectMapping) }
batch.add(preparedStatement.bind(values : _*))
}

import java.util.regex.Pattern

val pattern = Pattern.compile("[\\u0000-\\u001f]")
Expand Down
8 changes: 6 additions & 2 deletions src/main/scala/org/splink/cpipe/config/Arguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) {
val fetchSize = opt[Int](default = Some(5000),
descr = "The amount of rows which is retrieved simultaneously. Defaults to 5000.")

val batchSize = opt[Int](default = Some(500),
descr = "The amount of rows which is saved simultaneously when using mode import2. Defaults to 500.")

val threads = opt[Int](default = Some(32),
descr = "The amount of parallelism used in export2 mode. Defaults to 32 parallel requests.")

Expand All @@ -65,8 +68,9 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) {
val compression = choice(Seq("ON", "OFF"), default = Some("ON"),
descr = "Use LZ4 compression and trade reduced network traffic for CPU cycles. Defaults to ON")

val mode = choice(choices = Seq("import", "export", "export2"), required = true,
val mode = choice(choices = Seq("import", "import2", "export", "export2"), required = true,
descr = "Select the mode. Choose mode 'import' to import data. " +
"Choose mode 'import2' to import data with a prepared statement (faster, but only for tables with fixed columns); " +
"Choose mode 'export' to export data (optional with a filter); " +
"Choose mode 'export2' to export data using token ranges to increase performance and reduce load on the cluster. " +
"'export2' mode cannot be combined with a filter and it requires that the cluster uses Murmur3Partitioner. " +
Expand All @@ -79,4 +83,4 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) {
}

verify()
}
}
5 changes: 3 additions & 2 deletions src/main/scala/org/splink/cpipe/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ case object Config {
verbose <- args.verbose.toOption
threads <- args.threads.toOption
fetchSize <- args.fetchSize.toOption
batchSize <- args.batchSize.toOption
useCompression <- args.compression.toOption.map {
case c if c == "ON" => true
case _ => false
Expand All @@ -42,7 +43,7 @@ case object Config {
Selection(keyspace, table, filter),
Credentials(username, password),
Flags(!beQuiet, useCompression, verbose),
Settings(fetchSize, consistencyLevel, threads))
Settings(fetchSize, batchSize, consistencyLevel, threads))
}
}

Expand All @@ -54,4 +55,4 @@ final case class Credentials(username: String, password: String)

final case class Flags(showProgress: Boolean, useCompression: Boolean, verbose: Boolean)

final case class Settings(fetchSize: Int, consistencyLevel: ConsistencyLevel, threads: Int)
final case class Settings(fetchSize: Int, batchSize: Int, consistencyLevel: ConsistencyLevel, threads: Int)
38 changes: 38 additions & 0 deletions src/main/scala/org/splink/cpipe/processors/Importer2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.splink.cpipe.processors

import com.datastax.driver.core.{BatchStatement, PreparedStatement, Session}
import org.splink.cpipe.config.Config
import org.splink.cpipe.{JsonFrame, Output, Rps}

import scala.io.Source

class Importer2 extends Processor {

import org.splink.cpipe.JsonColumnParser._

val rps = new Rps()

override def process(session: Session, config: Config): Int = {
val frame = new JsonFrame()
var statement: PreparedStatement = null
val dataTypeMapping = getStringToObjectMappingForTable(session, config.selection.table)

Source.stdin.getLines().flatMap { line =>
frame.push(line.toCharArray)
}.grouped(config.settings.batchSize).foreach { group =>
val batch = new BatchStatement
group.foreach { str =>
string2Json(str).foreach { json =>
if (statement == null) {
statement = json2PreparedStatement(config.selection.table, json, session)
}
addJsonToBatch(json, statement, batch, dataTypeMapping)
rps.compute()
}
}
if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.")
session.execute(batch)
}
rps.count
}
}