Intro to stepping up
In this series, we’ve now arrived here:
- Intro
- Unit Tests
- Complex UDF
Now that we’re armed with a way to do fast development of a UDF, lets bring our game up a bit. The String input we did in the simple case represents the fact that we can easily handle any primitive type. Hive will automatically map any of its types to the appropriate java primitives (e.g., int, long, String, float, double, etc). What isn’t as immediately obvious is that Hive will also automatically map to simple POJO style object parameters, lists (mapped from Hive arrays) of primitives or POJOs, and maps. Additionally, Hive will allow you to map multiple overloaded cases of the evaluate method to allow special handling for each allowed type.
Note that although we’re making the plain old UDF more complex, we’re still only dealing with a scalar input / output method (i.e., data from a single row in, data into a single row out). These are meant more as convenience methods so that you don’t get locked into only using simple SQL when you need to do much more complex interactions with the data. We’ll get to understanding the more advanced UDAF and UDTF versions of the UDF in a later post.
What if we want structure?
So, lets build a UDF that can take a Hive named struct as input. By using a named struct, Hive can auto-map the provided fields to the appropriate places in the java object by using reflection. Note, you could just use a plain Hive struct without naming the fields, but the problem there is that it will assign based on the order of values in the struct. The lack of semantic labeling can actually create bugs down the line if, for instance, you swapped the order of two members of the same type. The struct version wouldn’t create an error while the named struct would (since the names of those fields in that order changed). It’s nice to have safety!
Here’s the code we’ll use today:
import org.apache.hadoop.hive.ql.exec.Description
import org.apache.hadoop.hive.ql.exec.UDF
@Description(
name = "sandbox_complex",
value = "_FUNC_(x) - Takes a struct and returns a downcased part of that struct",
extended = """
For example, you could do something like this:
SELECT _FUNC_(NAMED_STRUCT('value', 'Test Me', 'otherValue', 123));
==> {'value': 'test me', 'othervalue': 123}
"""
)
class SandboxComplexUDF : UDF() {
class TestMe {
var value:String? = null
var otherValue:Int? = null
fun undefined():Boolean {
return value == null || otherValue == null
}
fun reset() {
value = null
otherValue = null
}
}
val testMe = TestMe()
fun evaluate(arg:TestMe?):TestMe {
if(arg == null || arg.undefined()) {
testMe.reset()
return testMe
}
testMe.value = arg.value?.toLowerCase()
testMe.otherValue = arg.otherValue
return testMe
}
}
First, note that our POJO lacks a pretty standard Kotlin constructor for the members. This is because Hive will always call a default constructor so that it has a single instance it can update via setter functions. Hive bends over backwards to allocate as few objects as possible. When a deserializer is reading your data in for a single worker, it’s likely that only a single copy of the class TestMe will ever get created. Then it will update the one object’s members using the implied setters and repeatedly pass that single object to the evalutate method for each call. This can dramatically reduce how often garbage collection given the data volumes you’re usually talking about if you’re bothering to use a Hadoop system.
Next note that we create our own single instance of our I/O class for the return value. We’ll be taking the values from the single input object and mapping those to the single output object that we repeatedly return for serialization to the next worker. Since every worker is always assumed single threaded by design, this is actually safe despite nearly every Java textbook calling you mad for designing your objects to be used this way.
The rest of this should look pretty familiar compared to our simple case. The ‘value’ member is treated the same as the sandbox method while ‘otherValue’ just gets passed straight through.
What is in a struct?
In Hive, a simple struct is an ordered set of values that are not required to be the same type. Note that this is similar to an array, but arrays (in Hive but not Java) require that all entries be the same type. Structs are also accessed by an index that is not required to be a positive integer as arrays are.
I’ve differentiated a “named struct” in the last section, but technically all structs are named. For example, see:
SELECT STRUCT(1,"Hello",57.5);
==> {"col1":1,"col2":"Hello","col3":57.5}
So there’s an implicit name even when we don’t provide one.
What’s more, structs are serialized exactly as a set of column values (i.e., there’s a series of separated values), so they don’t really take up any more space on disk (i.e., because the names are only in the expectations of the query and not persisted) compared to just dropping more columns into the table. What it does provide is a way to refer to the grouping of columns as one concept which can then be passed to a function, sorted, etc in its entirety.
The downside of this is that you can not put a NULL struct value into a table. The deserializer expects to get the full set of separated values in what it’s reading in so that it can preserve overall table structure. Otherwise, what’s the difference between the first member of the struct being NULL and the entire struct being NULL? So, the best you can do with a table is get back a struct column filled with typed NULL values for all its members. This is very important to know when designing a UDF! Your input will never be NULL for the struct itself from a table, but all the members have a possibility of being NULL. However, if you’re feeding structs into a UDF in a case like a left join, then you can totally get a NULL valued struct! So make sure your UDFs are very defensive in their expectations.
Final note, notice that the STRUCT declaration in a table schema is confusingly the equivalent of a NAMED_STRUCT() call in a query and not STRUCT(). So if you want to pass to a function expecting an object or build a struct in a table column, you’re always using a named struct call to make sure that everybody is getting what they want. Again, a bonus here is that if you do something like change column order in the table schema but not in the query, then the query builder will catch this and refuse to run because of the added semantics even if all members are of the same type.
What is Jackson?
Jackson is a Java library that provides very fast parsers for a variety of encoded data standards. The most notable being JSON. Their original tag line was “best JSON parser for Java”, so you they’re also a very humble bunch.
However, we’re living in Kotlin now! So what we really want to use it the Jackson Kotlin Module. This guy adds support for Jackson to serialize / deserialize Kotlin’s KClass objects as well as take advantage of Kotlin’s more advanced typing and generics system over Java’s. This makes working with Jackson significantly easier since you have to juggle types a lot less as the module just figures things out for you based on context.
More infrastructure!
Now that we’ve created our new UDF, lets test it! But, this time it will be more complicated, right? We have a struct coming back rather than a simple primitive. We technically need to validate the actual structure in addition to the values to make sure this is working properly. For that, lets add some snazzy new helper code to the base class so that we can parse the serialized values returned by Hive.
Before we do anything, lets look at how Hive actually encodes a structured result:
hive> SELECT NAMED_STRUCT('HowMany', 123, 'Howdy', 'neighbor');
OK
{"howmany":123,"howdy":"neighbor"}
Time taken: 0.768 seconds, Fetched: 1 row(s)
Well that looks just like JSON! That’s handy considering we were just talking about Jackson. And in fact it’s very, very close to JSON. However, there are some differences. Notably, check out this map:
hive> SELECT MAP(1, 'one', 2, 'two');
OK
{1:"one",2:"two"}
Time taken: 0.044 seconds, Fetched: 1 row(s)
That’s a completely legitimate Hive object mapping integer keys to string values. However, it is very much not a valid JSON encoded object since the JSON object keys must always be strings (see the spec here). For the most part, things like this won’t effect your tests if you carefully design the data structures you’re working with. Jackson supplies a way of allowing unquoted fields like so:
mapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
However, note that this still assumes all object keys are strings. If you’re trying to verify that type is preserved in the map keys, that will be tricky to do using a method like this. Be careful!
Given all that, the differences we’re adding relative to the last post are highlighted here:
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.klarna.hiverunner.HiveShell
import com.klarna.hiverunner.StandaloneHiveRunner
import org.apache.commons.logging.LogFactory
import org.apache.commons.logging.impl.Log4JLogger
import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcFactory
import org.apache.log4j.Level
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Before
import org.junit.runner.RunWith
import kotlin.reflect.KClass
@RunWith(StandaloneHiveRunner::class)
abstract class TestBase(val methodName:String, val classToTest:KClass<*>) {
companion object {
val JSON_MAPPER = jacksonObjectMapper()
}
var setupComplete = false
/**
* The HiveShell *must* get on the child class for Klarna to work properly via reflection.
* By convention here, we will expect that all our own child test classes use the field name "hiveShell" for
* convenience.
*/
val childHiveShell by lazy {
ReflectUtils.getFieldValue(this, "hiveShell") as HiveShell
}
fun execute(str:String) {
childHiveShell.execute(str)
}
fun query(queryStr:String):List<String> {
return childHiveShell.executeQuery(queryStr)
}
fun queryOne(queryStr:String):String? {
val results = query(queryStr)
assertNotNull("Hive should not provide a null response!", results)
assertEquals("Expected exactly 1 result!", 1, results.size)
return results.first()
}
/**
* By using Kotlin's reified types, this allows Jackson to just figure out what you expect at runtime and apply
* the correct mappings between the serialized JSON and your expected type. This won't always work, but it's
* pretty convenient for quick solutions (especially in tests).
*/
inline fun <reified T : Any> queryForJSON(queryStr:String):T? {
val results = query(queryStr)
if(results.size > 1) {
throw RuntimeException("Expected zero or one result, got ${results.size}}")
}
if(results.size == 0 || "null".equals(results.first(), ignoreCase = true)) {
return null
}
return JSON_MAPPER.readValue(results.first())
}
@Before
fun prepare() {
if(!setupComplete) {
// Quick hack to remove all the annoying, innocuous ERROR lines from test output
(LogFactory.getLog(ConstantPropagateProcFactory::class.java.name) as Log4JLogger).logger.level = Level.FATAL
execute("CREATE TEMPORARY FUNCTION $methodName AS '${classToTest.qualifiedName}'")
setupComplete = true
}
}
We add the queryForJSON method which expects a single result from Hive and then attempts to marshal it to the desired return type in the context that it’s called in. This shows off one of Kotlin’s strong points. Essentially, the Jackson readValue call is able to inspect the reified generic type of the return value as though it was a parameter passed to it. Java generics can’t do this! This means that you can just say “read this JSON and set it to this type” and Jackson will just figure out what to do. Very simple!
The other new part is tweaking the log level for ConstantPropagateProcFactory. When Hive has a constant value specified in a query (i.e., anything that would always be passed as the same value to all workers), it passes through that factory for a special touch. If you’re a primitive value, then everything is wonderful and happy. If you’re more complex like a map or a struct, then the factory has to fall back to a different method for encoding the passing. For some reason they decided to ERROR log the fact that that’s happening when it should probably be something more like a WARN. This can be very confusing in test output since the test will say that it passed but Hive will spew out a ton of ERROR lines. We’ll be using constant structs as the input to our UDFs in tests, so lets avoid the confusion!
Little one on the way
We have one new dependency to add for testing! The Kotlin Jackson module will be added like this:
<project>
...
<dependencies>
...
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
<version>2.7.1-2</version>
<scope>test</scope>
</dependency>
</dependencies>
...
</project>
This will automatically pull in all the other various Jackson dependencies we’ll need. Again, note that we’re specifying a test scope so that the large amount of dependencies Jackson would otherwise pull in don’t have to actually travel along when we deploy it later.
Also, note that the -X build values in the version are significantly paired to Kotlin releases. If you bump your Kotlin compiler version, you’ll likely have to change that build number to ensure you can link against it properly. When in doubt, refer to their root README instructions. They’re generally good at providing instructions on how to get your environment working.
Does that structure work?
Now lets put all that new infrastructure to work verifying our new UDF:
import com.klarna.hiverunner.HiveShell
import com.klarna.hiverunner.annotations.HiveSQL
import org.junit.Assert.assertEquals
import org.junit.Test
class SandboxComplexUDFTest : TestBase("sandbox_complex", SandboxComplexUDF::class){
@Suppress("unused")
@field:HiveSQL(files = arrayOf())
var hiveShell:HiveShell? = null
/**
* Note that hive will downcase all field names when serializing, so these might not match all your camelcase names
* for the actual UDF members.
*/
data class TestMe(val value:String?, val othervalue:Int?)
fun queryForClass(queryStr:String):TestMe? {
return queryForJSON(queryStr)
}
@Test
fun basicStructInput() {
assertEquals(
TestMe("test me", 123),
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', 'Test Me',
'otherValue', 123
)
)
""")
)
}
@Test
fun nullValue() {
assertEquals(
TestMe(null, null),
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', STRING(NULL),
'otherValue', 123
)
)
""")
)
}
@Test
fun nullOtherValue() {
assertEquals(
TestMe(null, null),
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', "Test Me",
'otherValue', INT(NULL)
)
)
""")
)
}
@Test
fun nullBoth() {
assertEquals(
TestMe(null, null),
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', STRING(NULL),
'otherValue', INT(NULL)
)
)
""")
)
}
@Test(expected = IllegalArgumentException::class)
fun firstMemberWrongType() {
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', 'Hola',
'otherValue', 'banana'
)
)
""")
}
@Test(expected = IllegalArgumentException::class)
fun secondMemberWrongType() {
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', 123,
'otherValue', 123
)
)
""")
}
}
See how readable those tests are? It’ll be very easy for any co-developers or future devs to come in, understand exactly what was expected, and what was executed.
To provide that, note how our new queryForClass method does almost nothing but proxy the base class’ queryForJSON method along with a different return type. Because of the reified generic in the base method, this means that the Jackson Kotlin module will just figure out what to do given the context of the return value. In this case, it’ll look for a JSON object with the keys “value” and “othervalue” and instantiate a TestMe object out of it. If for whatever reason it can’t do that (e.g., it got a list of TestMe type objects back), then it will throw an error explaining why. Convenient!
Also, see how we’re using Kotlin’s three quoted, multi-line strings so that we can have a more natural SQL formatting for our more complex queries? This helps us not have 1000 char wide lines of SQL to inspect if something goes wrong with the test. I generally find that I can glance at a well formatted query quickly and get to the “woops!” fast when a problem occurs. A large one-line query almost always requires I copy paste that query to an editor so I can provide my own formatting in order to understand it. Even a multi-line quote, plus, quote, plus, etc can be very cumbersome to read with all the extra mess. And many devs end up doing the annoying one-liners specifically to avoid that mess! Dealing with that is way too time consuming when there’s lots of broken tests.
You might also see something funny the way I’m making a number of STRING(NULL) and INT(NULL) calls within the test SQL. Though it might not be obvious, Hive is very strongly typed and can be frustratingly demanding about types always lining up exactly even if a conversion is obvious from context. If you declare a table like:
CREATE TABLE cool_table ( int_value INT, str_value STRING )
Then if you say:
SELECT
sandbox_complex(NAMED_STRUCT(
'value', str_value,
'otherValue', int_value
))
FROM cool_table
You’d get the appropriately typed NULL values (i.e., string for string, int for int) from the table schema since it provides context. However, if you have a naked SELECT NULL, then what type of NULL is this? In many cases it will default to or auto convert to a string typed NULL, but I’ve come to prefer thinking of it as a void type since this forces me to always provide a type cast to make it clear what was meant to be. This is easy-ish for primitives, but this introduces a world of hurt for complex types (e.g., structs, lists, maps) since you can’t cast to these! Hopefully I can dive more into ways around that in another post (e.g., UDFs that force complex a NULL).
Finally, note our last two tests. These show a very different situation versus our original simple UDF. When Hive maps a named struct to a Java POJO, it now absolutely requires that the types match. So if we pass an integer to our string member, we get an error. If we pass a string (even if it’s an encoded integer!) to our integer member, we also get an error. It’s important to understand that this difference exists as users may expect this sort of behavior if they’re used to it with simpler methods.
I won’t show you the test output again since I went over what that’s like in the “Unit Tests” section. However, if you run mvn clean test on the command line or use IDEA to run the test on the full post code, then you should get a nice, clean success! Celebrate!.
Wow, complex things are long
Much longer than I originally intended! So I’m going to break this into at least two posts so that they don’t get too overwhelming. You can find the full code for this post (and the next posts when we get there) here.
In the next section(s) you can expect:
- Overloaded evaluate examples
- List and map mapping examples
- Variable number of parameters example
- A first generic UDF example
Hope to see you there!