From ccd42f6fb464fa11e2fd5487a41cf4dac1dc0e09 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 24 May 2019 15:11:37 +0200 Subject: [PATCH 1/5] Saving dates as the iso standard --- .../org/splink/cpipe/JsonColumnParser.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala index 2841718..27bfab4 100644 --- a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala +++ b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala @@ -1,5 +1,9 @@ package org.splink.cpipe +import java.time.format.DateTimeFormatter +import java.time.{ZoneId, ZonedDateTime} +import java.util.Date + import com.datastax.driver.core.{DataType, Row} import play.api.libs.json._ @@ -8,10 +12,16 @@ import scala.util.{Failure, Success, Try} object JsonColumnParser { - case class Column(name: String, value: String, typ: DataType) + case class Column(name: String, value: Object, typ: DataType) + + private val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") def column2Json(column: Column) = { - val sanitized = stripControlChars(column.value) + val sanitized: String = column.value match { + case date: Date => dateFormatter.format(ZonedDateTime.ofInstant(date.toInstant, ZoneId.of("UTC"))) + case _ => stripControlChars(column.value.toString) + } + Try(Json.parse(sanitized)) match { case Success(json) => val r = json match { @@ -28,7 +38,7 @@ object JsonColumnParser { def row2Json(row: Row) = row.getColumnDefinitions.iterator.asScala.flatMap { definition => - Try(row.getObject(definition.getName).toString) match { + Try(row.getObject(definition.getName)) match { case Success(value) => column2Json { Column(definition.getName, value, definition.getType) From 9647a0293503edafb6f687745a85b22ceb863dae Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 24 May 2019 15:11:45 +0200 Subject: [PATCH 2/5] Improving load speed --- .../splink/cpipe/processors/Importer.scala | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/processors/Importer.scala b/src/main/scala/org/splink/cpipe/processors/Importer.scala index 1b7de0e..a256c44 100644 --- a/src/main/scala/org/splink/cpipe/processors/Importer.scala +++ b/src/main/scala/org/splink/cpipe/processors/Importer.scala @@ -1,11 +1,17 @@ package org.splink.cpipe.processors +import java.util.concurrent.TimeUnit + import com.datastax.driver.core.Session -import org.splink.cpipe.{JsonFrame, Output, Rps} import org.splink.cpipe.config.Config +import org.splink.cpipe.{JsonFrame, Output, Rps} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} import scala.io.Source + class Importer extends Processor { import org.splink.cpipe.JsonColumnParser._ @@ -14,15 +20,19 @@ class Importer extends Processor { override def process(session: Session, config: Config): Int = { val frame = new JsonFrame() - Source.stdin.getLines().foreach { line => - frame.push(line.toCharArray).foreach { result => - string2Json(result).map { json => - - rps.compute() - if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") - - session.execute(json2Query(json, config.selection.table)) + Source.stdin.getLines().flatMap { line => + frame.push(line.toCharArray) + }.grouped(500).foreach { group: Iterable[String] => + group.map { jsonStr => + Future { + string2Json(jsonStr).map { json => + rps.compute() + if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") + session.execute(json2Query(json, config.selection.table)) + }.get } + }.foreach { future => + Await.ready(future, Duration(10, TimeUnit.SECONDS)).recover{case e: Exception =>println(e)} } } From 9d8e720923a0758dfc7616a525b26e2b6f670b54 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 24 May 2019 17:42:22 +0200 Subject: [PATCH 3/5] Revert "Improving load speed" This reverts commit 9647a029 --- .../splink/cpipe/processors/Importer.scala | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/processors/Importer.scala b/src/main/scala/org/splink/cpipe/processors/Importer.scala index a256c44..1b7de0e 100644 --- a/src/main/scala/org/splink/cpipe/processors/Importer.scala +++ b/src/main/scala/org/splink/cpipe/processors/Importer.scala @@ -1,17 +1,11 @@ package org.splink.cpipe.processors -import java.util.concurrent.TimeUnit - import com.datastax.driver.core.Session -import org.splink.cpipe.config.Config import org.splink.cpipe.{JsonFrame, Output, Rps} +import org.splink.cpipe.config.Config -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} import scala.io.Source - class Importer extends Processor { import org.splink.cpipe.JsonColumnParser._ @@ -20,19 +14,15 @@ class Importer extends Processor { override def process(session: Session, config: Config): Int = { val frame = new JsonFrame() - Source.stdin.getLines().flatMap { line => - frame.push(line.toCharArray) - }.grouped(500).foreach { group: Iterable[String] => - group.map { jsonStr => - Future { - string2Json(jsonStr).map { json => - rps.compute() - if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") - session.execute(json2Query(json, config.selection.table)) - }.get + Source.stdin.getLines().foreach { line => + frame.push(line.toCharArray).foreach { result => + string2Json(result).map { json => + + rps.compute() + if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") + + session.execute(json2Query(json, config.selection.table)) } - }.foreach { future => - Await.ready(future, Duration(10, TimeUnit.SECONDS)).recover{case e: Exception =>println(e)} } } From fd3dff9cbeceda721cd37d07594377ac408a49a5 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 24 May 2019 17:48:05 +0200 Subject: [PATCH 4/5] Added importer that uses batches of prepared statements. --- src/main/scala/org/splink/cpipe/CPipe.scala | 7 ++- .../org/splink/cpipe/JsonColumnParser.scala | 43 ++++++++++++++++--- .../org/splink/cpipe/config/Arguments.scala | 5 ++- .../splink/cpipe/processors/Importer2.scala | 38 ++++++++++++++++ 4 files changed, 84 insertions(+), 9 deletions(-) create mode 100644 src/main/scala/org/splink/cpipe/processors/Importer2.scala diff --git a/src/main/scala/org/splink/cpipe/CPipe.scala b/src/main/scala/org/splink/cpipe/CPipe.scala index ac3d654..0149ed2 100644 --- a/src/main/scala/org/splink/cpipe/CPipe.scala +++ b/src/main/scala/org/splink/cpipe/CPipe.scala @@ -1,7 +1,8 @@ package org.splink.cpipe -import org.splink.cpipe.processors.{Exporter, Exporter2, Importer} -import org.splink.cpipe.config.{Config, Arguments} +import org.splink.cpipe.processors.{Exporter, Exporter2, Importer, Importer2} +import org.splink.cpipe.config.{Arguments, Config} + import scala.language.implicitConversions import scala.util.{Failure, Success, Try} @@ -23,6 +24,8 @@ object CPipe { config.mode match { case "import" => new Importer().process(session, config) + case "import2" => + new Importer2().process(session, config) case "export" => new Exporter().process(session, config) case "export2" => diff --git a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala index 27bfab4..b573976 100644 --- a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala +++ b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala @@ -1,10 +1,9 @@ package org.splink.cpipe -import java.time.format.DateTimeFormatter -import java.time.{ZoneId, ZonedDateTime} +import java.lang.{Double, Boolean} import java.util.Date -import com.datastax.driver.core.{DataType, Row} +import com.datastax.driver.core.{BatchStatement, DataType, PreparedStatement, Row, Session} import play.api.libs.json._ import scala.collection.JavaConverters._ @@ -14,11 +13,11 @@ object JsonColumnParser { case class Column(name: String, value: Object, typ: DataType) - private val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + private val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") def column2Json(column: Column) = { val sanitized: String = column.value match { - case date: Date => dateFormatter.format(ZonedDateTime.ofInstant(date.toInstant, ZoneId.of("UTC"))) + case date: Date => dateFormat.format(date) case _ => stripControlChars(column.value.toString) } @@ -83,6 +82,40 @@ object JsonColumnParser { } + def json2PreparedStatement(table: String, json: JsObject, session: Session): PreparedStatement = { + val str = s"INSERT INTO $table ( ${json.fields.map(_._1).mkString(", ")} ) VALUES ( ${json.fields.map(_ => "?").mkString(", ")} );" + session.prepare(str) + } + + def getStringToObjectMappingForTable(session: Session, table: String): Map[String, String => Object] = { + val queryResult = session.execute(s"select * from $table limit 1") + queryResult.getColumnDefinitions.asScala.map{ + definition => definition.getName -> getStringToObjectConversionMethod(definition.getType) + }.toMap + } + + def getStringToObjectConversionMethod(dataType: DataType): String => Object = (s: String) => { + dataType.getName match { + case DataType.Name.DATE => dateFormat.parse(s) + case DataType.Name.TIMESTAMP => dateFormat.parse(s) + case DataType.Name.DOUBLE => new Double(s.toDouble) + case DataType.Name.INT => new Integer(s.toInt) + case DataType.Name.VARCHAR => s + case DataType.Name.BOOLEAN => new Boolean(s == "true") + case _ => throw new IllegalArgumentException(s"Please add a mapping for the '${dataType.getName}' type") + } + } + + def jsValueToScalaObject(name: String, jsValue: JsValue, objectMapping: Map[String, String => Object]) : Object = { + val v = jsValue.toString.stripPrefix("\"").stripSuffix("\"") + objectMapping.get(name).getOrElse(throw new IllegalArgumentException(s"$name was not found in the map $objectMapping"))(v) + } + + def addJsonToBatch(json: JsObject, preparedStatement: PreparedStatement, batch: BatchStatement, objectMapping: Map[String, String => Object]): Unit = { + val values = json.fields.map { v => jsValueToScalaObject(v._1, v._2, objectMapping) } + batch.add(preparedStatement.bind(values : _*)) + } + import java.util.regex.Pattern val pattern = Pattern.compile("[\\u0000-\\u001f]") diff --git a/src/main/scala/org/splink/cpipe/config/Arguments.scala b/src/main/scala/org/splink/cpipe/config/Arguments.scala index ce94919..10b8542 100644 --- a/src/main/scala/org/splink/cpipe/config/Arguments.scala +++ b/src/main/scala/org/splink/cpipe/config/Arguments.scala @@ -65,8 +65,9 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) { val compression = choice(Seq("ON", "OFF"), default = Some("ON"), descr = "Use LZ4 compression and trade reduced network traffic for CPU cycles. Defaults to ON") - val mode = choice(choices = Seq("import", "export", "export2"), required = true, + val mode = choice(choices = Seq("import", "import2", "export", "export2"), required = true, descr = "Select the mode. Choose mode 'import' to import data. " + + "Choose mode 'import2' to import data with a prepared statement (faster, but only for tables with fixed columns); " + "Choose mode 'export' to export data (optional with a filter); " + "Choose mode 'export2' to export data using token ranges to increase performance and reduce load on the cluster. " + "'export2' mode cannot be combined with a filter and it requires that the cluster uses Murmur3Partitioner. " + @@ -79,4 +80,4 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) { } verify() -} \ No newline at end of file +} diff --git a/src/main/scala/org/splink/cpipe/processors/Importer2.scala b/src/main/scala/org/splink/cpipe/processors/Importer2.scala new file mode 100644 index 0000000..889095c --- /dev/null +++ b/src/main/scala/org/splink/cpipe/processors/Importer2.scala @@ -0,0 +1,38 @@ +package org.splink.cpipe.processors + +import com.datastax.driver.core.{BatchStatement, PreparedStatement, Session} +import org.splink.cpipe.config.Config +import org.splink.cpipe.{JsonFrame, Output, Rps} + +import scala.io.Source + +class Importer2 extends Processor { + + import org.splink.cpipe.JsonColumnParser._ + + val rps = new Rps() + + override def process(session: Session, config: Config): Int = { + val frame = new JsonFrame() + var statement: PreparedStatement = null + val dataTypeMapping = getStringToObjectMappingForTable(session, config.selection.table) + + Source.stdin.getLines().flatMap { line => + frame.push(line.toCharArray) + }.grouped(500).foreach { group => + val batch = new BatchStatement + group.foreach { str => + string2Json(str).foreach { json => + if (statement == null) { + statement = json2PreparedStatement(config.selection.table, json, session) + } + addJsonToBatch(json, statement, batch, dataTypeMapping) + rps.compute() + } + } + if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") + session.execute(batch) + } + rps.count + } +} From 4d8efe877b24898ccf8fabc9180c536b6cbfbe50 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Mon, 27 May 2019 10:31:12 +0200 Subject: [PATCH 5/5] Added --batch-size argument --- src/main/scala/org/splink/cpipe/config/Arguments.scala | 3 +++ src/main/scala/org/splink/cpipe/config/Config.scala | 5 +++-- src/main/scala/org/splink/cpipe/processors/Importer2.scala | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/config/Arguments.scala b/src/main/scala/org/splink/cpipe/config/Arguments.scala index 10b8542..7e82f15 100644 --- a/src/main/scala/org/splink/cpipe/config/Arguments.scala +++ b/src/main/scala/org/splink/cpipe/config/Arguments.scala @@ -54,6 +54,9 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) { val fetchSize = opt[Int](default = Some(5000), descr = "The amount of rows which is retrieved simultaneously. Defaults to 5000.") + val batchSize = opt[Int](default = Some(500), + descr = "The amount of rows which is saved simultaneously when using mode import2. Defaults to 500.") + val threads = opt[Int](default = Some(32), descr = "The amount of parallelism used in export2 mode. Defaults to 32 parallel requests.") diff --git a/src/main/scala/org/splink/cpipe/config/Config.scala b/src/main/scala/org/splink/cpipe/config/Config.scala index 9778b66..df99f37 100644 --- a/src/main/scala/org/splink/cpipe/config/Config.scala +++ b/src/main/scala/org/splink/cpipe/config/Config.scala @@ -18,6 +18,7 @@ case object Config { verbose <- args.verbose.toOption threads <- args.threads.toOption fetchSize <- args.fetchSize.toOption + batchSize <- args.batchSize.toOption useCompression <- args.compression.toOption.map { case c if c == "ON" => true case _ => false @@ -42,7 +43,7 @@ case object Config { Selection(keyspace, table, filter), Credentials(username, password), Flags(!beQuiet, useCompression, verbose), - Settings(fetchSize, consistencyLevel, threads)) + Settings(fetchSize, batchSize, consistencyLevel, threads)) } } @@ -54,4 +55,4 @@ final case class Credentials(username: String, password: String) final case class Flags(showProgress: Boolean, useCompression: Boolean, verbose: Boolean) -final case class Settings(fetchSize: Int, consistencyLevel: ConsistencyLevel, threads: Int) +final case class Settings(fetchSize: Int, batchSize: Int, consistencyLevel: ConsistencyLevel, threads: Int) diff --git a/src/main/scala/org/splink/cpipe/processors/Importer2.scala b/src/main/scala/org/splink/cpipe/processors/Importer2.scala index 889095c..7695188 100644 --- a/src/main/scala/org/splink/cpipe/processors/Importer2.scala +++ b/src/main/scala/org/splink/cpipe/processors/Importer2.scala @@ -19,7 +19,7 @@ class Importer2 extends Processor { Source.stdin.getLines().flatMap { line => frame.push(line.toCharArray) - }.grouped(500).foreach { group => + }.grouped(config.settings.batchSize).foreach { group => val batch = new BatchStatement group.foreach { str => string2Json(str).foreach { json =>