H2 and JDBC

Last updated on 19th April 2024

The other data formats on Data Export all make usage of the default built-in output that is managed either via configuring of what data is serialized, or through output channels Output Channels. However there may be the case where these types of outputs do not fit into your data science workflow. The below will detail a brief example using H2 - an in-memory relational database to explain how this can be accomplished.

Simudyne SDK as of version 2.4 makes usage of Eel a toolkit for manipulating data in the Hadoop ecosystem making usage of outputs in parquet, orc, csv in locations such as local, HDFS, or Hive tables. Existing users can output both local and/or distributed on a cluster, users will not need to change anything about their code in order to make usafe of this new output technology and enjoy the performance benefits that it provides.

However in order to develop a customized data pipeline there are additional options via usage of Eel being included in the SDK that users should feel free to take advantage of.

Explaining Source & Sink

The core data structure in Eel is a DataStream consisting of a Schema and n rooms containing values for each field in the schema. Conceptually, a DataStream is similar to a table in a normal reltational database.

With Eel, DataStreams can be read from a variety of Sources, and correspondingly can be written out to Sinks. These Source/Sink types can cary from Hive, JDBC Database, or collections in Java, JSON, or Parquet.

Supported Eel Projects

By default, Eel Core allows for the variety of basic Source/Sink formats such as Parquet/CSV/etc. We also include Hive and Orc to allow users to both natively work with Hive via our SDK or easily work with Orc formatted files. However, HBase, Kafka, and Kudu are not included. Should a user wish to work with these technologies they must do so at their own risk. If you wish to include these parts of the Eel SDK's libraries you should make sure to EXCLUDE both Hive/Hadoop/Log4J packages from that library in order to not cause issues with conflicting versions.

Here is a basic example of the code structure for a Source/Sink:

val source = CsvSource(new Path("input_historical.csv"))
val sink = ParquetSink(new Path("output_npv.pq"))
source.toDataStream().filter(_.get("npv") > 0).to(sink)

What's happening here is that we are reading in a csv file containing presumably multiple columns/rows of data. Then we define a Parquet output that will contain just the NPV value. We then take the source and create a DataStream, and then we can process that stream with actions such as map, filter, take, drop. Here we add a filter to output the stream only if the NPV value is greater than 0. If this is true it will output the contents of the row in the same schema format as the input, but to a parquet Sink via the .to() method.

Defining a Custom Schema

Note if you wish to make usage of Custom Output Channels please refer to our (Reference Docs) and use the standard Parquet or Hive outputs.

However, because of the versatility using Eel, it might be faster or make more sense to define a custom Source/Sink operation within your model that does not have to handle the serailization used by various Simudyne SDK structures. As such, please refer to this short example on how if using your own custom Source/Sink code on defining a schema.

val personDetailsStruct = Field.createStructField("PERSON_DETAILS",
  Seq(
	Field("NAME", StringType),
	Field("AGE", IntType.Signed),
	Field("SALARY", DecimalType(Precision(38), Scale(5))),
	Field("CREATION_TIME", TimestampMillisType)
  )
)
val schema = StructType(personDetailsStruct)

val rows = Vector(
  Vector(Vector("Fred", 50, BigDecimal("50000.99000"), new Timestamp(System.currentTimeMillis()))),
  Vector(Vector("Gary", 50, BigDecimal("20000.34000"), new Timestamp(System.currentTimeMillis()))),
  Vector(Vector("Alice", 50, BigDecimal("99999.98000"), new Timestamp(System.currentTimeMillis())))
)

This population of a the rows Vector can instead be done as an agent action within the step function.

static Action<Person> updateSalary() {
    return Action.create(
        Person.class,
        p -> {
			interim_row = Vector("Fred", 50, BigDecimal("50000.99000"), new Timestamp(System.currentTimeMillis()))
			rows.addAll(interim_row)
		}
	);
}

This example structure can then be written out to a Parquet Sink like this:

DataStream.fromValues(schema, rows).to(ParquetSink(parquetFilePath))

Creating an H2 Sink via JDBC

If you are working with H2 you will need to include the driver into your code by adding org.h2.Driver to your imports.

This example given a database string and a tableName will return a JdbcSink which as shown above can be the output. For H2 the database string should look something like this jdbc:h2:~/test

def createJDBCSink(dbName: String, tableName: String): JdbcSink = {
	val driver = "org.h2.Driver" 

	try {
	  Class.forName(driver)
	} catch {
	  case _: Throwable => println("JDBC Driver not Found");
	}

	val dataSource = new BasicDataSource()
	dataSource.setDriverClassName(driver)
	dataSource.setUrl(dbName)
	dataSource.setUsername("username1")
	dataSource.setPassword("password1")
	dataSource.setPoolPreparedStatements(false)

	new JdbcSink(() => dataSource.getConnection(), tableName)
}