Kotlin on Apache Hive – Intro

Why am I doing this?

For the most part, Hive has been a great way of keeping data engineering and analytics code as friendly and immediately usable as possible for less technical colleagues.  However, as your basic model grows, it will almost never be enough to use only simple HiveQL to both scale and get across the product finish line as quickly and often as possible.  As such, Kotlin has (so far!) been a pretty nice way to start extending functionality via Hive UDFs in a safe, easy, and fast way.  It’s also significantly easier to test with!

To organize my thoughts, I’m writing about a lot of the lessons I’ve learned and other random thoughts I’ve picked up while working Kotlin into a number of Hive projects.  Maybe someone else out there might even find this helpful too!

Setup

I’ve been working mostly with Hive on Amazon Elastic MapReduce on their 4.x releases.  This dictates the versions of Hive / Hadoop, but almost all of this should still apply if you’re running on different, newer versions internally.  Given that, these are the relevant specifications used for this demo:

  1. Apache Hive 1.0.0
    With some forward patches applied by Amazon when in EMR
  2. Apache Hadoop 2.6.0
    With some forward patches applied by Amazon when in EMR
  3. Kotlin 1.0.0
  4. Maven 3.2.1
  5. JDK 1.7.0_79
    Used for local builds / verification, but note Kotlin supports 1.6

What is a Hive UDF?

In Hive, a UDF (or User Defined Function) is an extension to the core HiveQL language that can be added at run-time via the JVM.  Hive adds many UDFs to itself depending on the exact version you’re running.  You may also add your own extensions by following a fairly simple process.

Note that you may often see people applying a transform map/reduce step that passes to scripts outside of Hive (e.g., python).  This is not a UDF!  For example, this might look something like this:

SELECT 
  TRANSFORM(user_id, a, bunch, of, cols, from, some_table)
  USING 'python my_cool_script.py'
  AS (user_id, separated, output, cols, from, my_cool_script)
FROM some_table;

This is potentially super nice if you know nothing about writing, building, and deploying to the JVM (which admittedly can be confusing)!  And, in some cases, if what you want to do can be fully contained in a single transform script pass, then this can be a great solution.  However, also note that:

  1. Hive worker must de-serialize all data from its input into memory to start the job
  2. Hive worker must re-serialize all data to stdin of the transform script
  3. Transform script must de-serialize stdin stream in order to do its work
  4. Transform script must re-serialize all data to stdout to hand back to Hive worker
  5. Hive worker must de-serialize all data from stdout to memory to continue the query

Hopefully it is obvious that this is a bananas amount of overhead.  This means if you’re piping everything through a python (or similar) script just so that you can transform one column with a familiar library that Hive doesn’t appear to support (I have seen this a lot!), then this is a *wildly* and unnecessarily bananas amount of overhead.

Additionally, note that:

  1. Your service provider (e.g., EMR, internal ops) must supply the exact versions and set of libraries you need for whatever is running your scripts.  If they won’t or can’t change these, then you’ll be getting severely limited in how you can solve your problem.  Hive / Hadoop provides no means of doing this provisioning for you.  However, you can do your own provisioning by supplying your own jars!
  2. Your transform script’s platform may de-serialize many types in an inconsistent fashion with Hive (this is *very* difficult to track down once it has become a problem).  Additionally, (de-)serializing any complex types (e.g., structs, arrays) can be a headache to get just right when doing these exchanges.
  3. If your transform script has a problem, you *must* go to the individual worker log to get any details and not the job log.  On top of that, you must very much hope that your transform script’s engine is providing good details to stderr and not stdout, which many tools unfortunately do not honor.  This can be a very, very frustrating process for any user not deeply familiar with how a Hive deployment works.

The simplest UDF

To start, lets build the simplest possible scalar UDF function so that we can create an end-to-end path for additional development.  In this case, I’m going to use the term ‘scalar’ to mean it operates in the context of a single and never across multiple rows in any way by either emitting or consuming multiple rows of data.  This does not mean that the input or output of the UDF can’t be something like an list or map of data.  It just means that that list or map would be a complex typed column of that singular row of data and not something that spans multiple rows in the table.  These would have to be exploded out prior to being fed into something like an aggregation function.

Our requirements will be to create a function ‘sandbox’ which takes an input string and returns the same string in all lowercase characters.  If it receives a NULL input, it will return a NULL response.  An example of using this function in a query would look like this:

SELECT sandbox('Test Me');
  ==> test me

An implementation of that might look something like this (bare with the Java syntax highlighting getting applied to Kotlin source for now please, WordPress doesn’t support Kotlin at the moment):

import org.apache.hadoop.hive.ql.exec.Description
import org.apache.hadoop.hive.ql.exec.UDF

@Description(
    name = "sandbox",
    value = "_FUNC_(x) - Takes in a string, returns downcased version of that string",
    extended = """
For example, you could do something like this:

  SELECT _FUNC_('Test Me');
    ==> test me
"""
)
class SandboxSimpleUDF : UDF() {

    fun evaluate(input:String?):String? {
        return input?.toLowerCase()
    }

}

Pretty easy right?  I won’t go over all the Kotlin specific details (try this great playpen out to get a feel for how Kotlin works), but I will go over those specifically relevant to Hive.

First off, the @Description annotation provides metadata for Hive to provide to the user via the describe command.

The actual work here will be performed by the evaluate method.  Note that this is *not* overridden.  Hive will conventionally look for all methods named ‘evaluate’ at run-time using reflection to figure out how to map query execution to this logic.  Note too that every worker will instantiate one (and exactly one!) new instance of the UDF object in order to call the appropriate evaluate method for that job, so in theory you could track some state from call to call, but don’t get too far ahead of yourself yet!

Note that evaluate must support NULL values coming in and going out because of our requirements, so the Kotlin I/O types must have the ? suffix added.  By additionally using ? when calling into the input object, we’re saying that it’s ok to just ignore the call if input is undefined.  It’s essentially a more succinct way of saying:

if(input == null) {
    return null
}
return input!!.toLowerCase()

We could go even further and put the one-liner function as:

fun evaluate(input:String?):String? = input?.toLowerCase()

Personally I prefer the bracketed syntax for readability, but it’s worthwhile knowing that this is a stylistic option.  It might be useful if you need to overload a bazillion versions of the method, for instance.

Nice chars, but what good is it to me?

Now we’re left with how to verify and deploy this.  To begin with, let’s set up a Maven project to do the build.  Here I will use Apache Maven (homebrew installation instructions here) purely out of familiarity and because I could find ample examples when I started for Kotlin (notably to support Kotlin/Java cross-compile support).  If you prefer not to, there should be enough out there now to use Gradle or any other platform.

The pom.xml file I’ve used for this simplest case looks like this:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mergehead</groupId>
    <artifactId>kotlinhive-intro</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <name>Kotlin on Apache Hive - Intro</name>
    <description>Example code for post</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kotlin.version>1.0.0</kotlin.version>
        <hadoop.version>2.6.0</hadoop.version>
        <hive.version>1.0.0</hive.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib</artifactId>
            <version>${kotlin.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>kotlin-maven-plugin</artifactId>
                <groupId>org.jetbrains.kotlin</groupId>
                <version>${kotlin.version}</version>

                <configuration/>
                <executions>
                    <execution>
                        <id>compile</id>
                        <phase>process-sources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <phase>process-test-sources</phase>
                        <goals>
                            <goal>test-compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.8</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/jars</outputDirectory>
                            <includeArtifactIds>kotlin-stdlib,kotlin-runtime</includeArtifactIds>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

This should:

  1. Successfully build the new UDF for the Hive/Hadoop versions we discussed
  2. Generate a jar file at target/kotlinhive-intro-1.0.0.jar that can be loaded into a Hive shell
  3. Add two jar files in target/jars/ which are the kotlin dependencies to be loaded into the Hive shell at run-time

This should all build with a command on your terminal from the project root directory (where your pom.xml file is) like so:

mvn clean package

I know this looks like a lot of configuration to get started, but it makes a great deal much easier down the line when starting to smash together lots of other code, libraries, configuration, and external systems.  If you still can’t stand it, consider going the Gradle route instead!

Verify that we didn’t screw up

We can do a super fast verification of all this by launching a local mode Hive shell on your machine (great instructions for installing on osx, other platforms are but a google away).  Using the verification script found here, this is what executing the new UDF would look like (with a little white space added output for readability):

hive> ADD JAR jars/kotlin-runtime-1.0.0.jar;
Added [jars/kotlin-runtime-1.0.0.jar] to class path
Added resources: [jars/kotlin-runtime-1.0.0.jar]

hive> ADD JAR jars/kotlin-stdlib-1.0.0.jar;
Added [jars/kotlin-stdlib-1.0.0.jar] to class path
Added resources: [jars/kotlin-stdlib-1.0.0.jar]

hive> ADD JAR kotlinhive-intro-1.0.0.jar;
Added [kotlinhive-intro-1.0.0.jar] to class path
Added resources: [kotlinhive-intro-1.0.0.jar]

hive> CREATE TEMPORARY FUNCTION sandbox AS 'com.mergehead.kotlinhive.intro.SandboxSimpleUDF';
OK
Time taken: 0.342 seconds

hive> SELECT sandbox('Test Me');
OK
test me
Time taken: 0.333 seconds, Fetched: 1 row(s)

hive> DESCRIBE FUNCTION sandbox;
OK
sandbox(x) - Takes in a string, returns downcased version of that string
Time taken: 0.046 seconds, Fetched: 1 row(s)

hive> DESCRIBE FUNCTION EXTENDED sandbox;
OK
sandbox(x) - Takes in a string, returns downcased version of that string

For example, you could do something like this:

  SELECT sandbox('Test Me');
    ==> test me

Time taken: 0.027 seconds, Fetched: 7 row(s)

Woo!  Success!  Or at least I got a success.  Hopefully you did too. Note that I have found that the Kotlin jar file add order sometimes causes problems (though I couldn’t reproduce here).  This especially seemed to occur on older pre-release versions of Kotlin.  If you’re ever getting some really bizarre results when adding or calling the function, try reversing the order of the ADD JAR calls for the Kotlin dependencies.

You might want to play around with your new toy for a bit to see how it behaves.  Try passing in a NULL to make sure that works.  What happens if we pass an integer?  Is this ok for what you want?

What next?

I’m going to cut this off here for now as a completed product. You can find a full project implementation for all of this here: https://github.com/mergehead/kotlin-hive-intro

I’ll plan to follow up with at least the following topics for future articles:

  1. How to unit test Kotlin Hive UDFs in a sane fashion?
    Our verification step would not be fun for lots of changing UDFs­
  2. Can we make more complicated scalar UDFs in Kotlin?
    Examples:  named structs in/out, scalar lists in/out, type overloading
  3. What would a UDTF look like in Kotlin?
    Make your own complex, custom explodes
  4. What would a UDAF look like in Kotlin?
    Aggregate all the things however you wish
  5. How do you create an operational model for Kotlin on Hive?
    Users shouldn’t have to know about adding jars and functions for ephemeral clusters if they can avoid it!  Persistent clusters should be able to both clearly represent their current versioning and be easily re-buildable should they fail.
  6. How do you do integration testing on Hive? (maybe!)
    What if we use more than just Hive?  Or want to verify full, abbreviated data flowing through a large set of scripts without allocating a bazillion expensive EC2 instances?

 

Kotlin on Apache Hive – Intro

Leave a comment