How to work with many
In this series, we’ve now arrived here:
- Intro
- Unit Tests
- Complex UDF
- Complex UDF 2
We’ve just built the most useful method that I have ever seen. How could we top that? We’re going to give it multiple levels of responsibility at the same time, that’s how! Lets add type overloading, handle lists & maps, and then take a peak at what awaits us even further into the Hive UDF innards.
Giving it all you’ve got
If each UDF could only handle only one type of input and output, that would make many things super annoying. You’d need a int_sandbox, string_sandbox, array_sandbox, etc method for every possible type you’d ever want to handle. Plus you’d have to communicate all this to the user lest they think that they can’t do what they want! What a pain.
Lets instead allow for a single UDF to handle more than one input type. Here what this would look like:
import org.apache.hadoop.hive.ql.exec.Description
import org.apache.hadoop.hive.ql.exec.UDF
@Description(
name = "sandbox_overloaded",
value = "_FUNC_(x) - Takes a string or struct, returns downcased string or part of struct",
extended = """
For example, you could do something like this:
SELECT _FUNC_('Test Me');
==> test me
or this:
SELECT _FUNC_(NAMED_STRUCT('value', 'Test Me', 'otherValue', 123));
==> {'value': 'test me', 'othervalue': 123}
"""
)
class SandboxOverloadedUDF : UDF() {
class TestMe {
var value:String? = null
var otherValue:Int? = null
fun undefined():Boolean {
return value == null || otherValue == null
}
fun reset() {
value = null
otherValue = null
}
}
val testMe = TestMe()
fun evaluate(arg:TestMe?):TestMe {
if(arg == null || arg.undefined()) {
testMe.reset()
return testMe
}
testMe.value = arg.value?.toLowerCase()
testMe.otherValue = arg.otherValue
return testMe
}
fun evaluate(input:String?):String? {
return input?.toLowerCase()
}
}
This looks very similar to SandboxComplexUDF, but this one adds back in the simple string evaluate from our original case. It’s important that both methods have the exact same name, but the input parameters and return type can be whatever you wish. The UDF will then behave pretty much as expected when called from a shell. For example, see this experiment:
hive> ADD JAR jars/kotlin-runtime-1.0.1.jar;
Added [jars/kotlin-runtime-1.0.1.jar] to class path
Added resources: [jars/kotlin-runtime-1.0.1.jar]
hive> ADD JAR jars/kotlin-stdlib-1.0.1.jar;
Added [jars/kotlin-stdlib-1.0.1.jar] to class path
Added resources: [jars/kotlin-stdlib-1.0.1.jar]
hive> ADD JAR kotlinhive-complexudf-1.0.0.jar;
Added [kotlinhive-complexudf-1.0.0.jar] to class path
Added resources: [kotlinhive-complexudf-1.0.0.jar]
hive> CREATE TEMPORARY FUNCTION sandbox_overloaded AS 'com.mergehead.kotlinhive.complexudf.SandboxOverloadedUDF';
OK
Time taken: 0.354 seconds
hive> SELECT sandbox_overloaded('Test Me');
OK
test me
Time taken: 0.32 seconds, Fetched: 1 row(s)
hive> SELECT sandbox_overloaded(NAMED_STRUCT(
> 'value', 'Test Me',
> 'otherValue', 123
> ));
OK
{"value":"test me","othervalue":123}
Time taken: 0.046 seconds, Fetched: 1 row(s)
This can be useful if a user wants to perform some action, but doesn’t have a lot of awareness of underlying structure. For instance, you could use overloading to support different versions of a stream of data that has changed over time (e.g., from bug fixes, new features, etc). The user calling the method just knows they want to take an action on that stream and doesn’t care about what’s underneath. This way they don’t have to think since you just automatically map from their input type to the appropriate implementation. Useful!
See unit tests for this UDF here for more exhaustive use cases.
If it fits, it collects
What about when we want to work with more than one thing contained in a single row? With Hive, this can happen in three cases:
- Input is an
ARRAYtype - Input is a
MAPtype - Input has a variable number of arguments
(e.g.,GREATEST(1, 2, 3, ..., 50) => 50)
We’ll use this quick UDF to demo how to support these in Hive:
import org.apache.hadoop.hive.ql.exec.Description
import org.apache.hadoop.hive.ql.exec.UDF
@Description(
name = "sandbox_collections",
value = "_FUNC_(x) - takes in a collection of TestMes and works on all of them",
extended = """
For example, you could do something like this:
SELECT _FUNC_(NAMED_STRUCT('value', 'Test Me', 'otherValue', 123));
==> {'value': 'test me', 'othervalue': 123}
or this:
SELECT _FUNC_(ARRAY(NAMED_STRUCT('value', 'Test Me', 'otherValue', 123)));
==> [{'value': 'test me', 'othervalue': 123}]
or this:
SELECT _FUNC_(MAP('banana', NAMED_STRUCT('value', 'Test Me', 'otherValue', 123)));
==> {'banana': {'value': 'test me', 'othervalue': 123}]}
or this:
SELECT _FUNC_(
NAMED_STRUCT('value', 'Test Me', 'otherValue', 123),
NAMED_STRUCT('value', 'Test Me Too', 'otherValue', 456)
);
==> [{'value': 'test me', 'othervalue': 123}, {'value': 'test me too!', 'othervalue': 456}]
"""
)
class SandboxCollectionsUDF : UDF() {
class TestMe {
var value:String? = null
var otherValue:Int? = null
fun undefined():Boolean {
return value == null || otherValue == null
}
fun reset() {
value = null
otherValue = null
}
fun clone():TestMe {
val retVal = TestMe()
retVal.value = value
retVal.otherValue = otherValue
return retVal
}
}
val testMe = TestMe()
private fun mapStructValue(arg:TestMe?):TestMe {
if(arg == null || arg.undefined()) {
testMe.reset()
return testMe
}
testMe.value = arg.value?.toLowerCase()
testMe.otherValue = arg.otherValue
return testMe
}
fun evaluate(argList:List<TestMe?>?):List<TestMe>? {
if(argList == null) {
return null
}
return argList.map { mapStructValue(it).clone() }
}
fun evaluate(argMap:Map<String?, TestMe?>?):Map<String?, TestMe>? {
if(argMap == null) {
return null
}
return argMap.map { Pair(it.key, mapStructValue(it.value).clone()) }.toMap()
}
fun evaluate(vararg varArg:TestMe?):List<TestMe>? {
if(varArg == null) {
return null
}
return evaluate(varArg.toList())
}
}
So from the above list, note that Hive supports the following data type mappings in Kotlin:
- Hive
ARRAY=> KotlinList - Hive
MAP=> KotlinMap - Hive variable arguments => Kotlin vararg tag (which produces a typed
Array)
Pretty easy, no? However, do notice that we’ve had to abandon the attempts to reduce extra object allocations on each evaluation since we can no longer assume prior to execution how many I/O objects we’ll have each time. You can get around this a bit if you move to an object pooling model (e.g., Apache commons-pool), but I won’t go into this right now. Further, I would also recommend only bothering to go down that path if you are actually observing related performance problems. Object pools can be as big a pain as they are a solution if you’re not careful.
Test most of the things!
You can find tests for the various collection inputs here. The test suite gives loads of examples for you to play with if you wish, but I won’t got into detail here. However! Do note this new little tidbit:
override fun setupHQL() {
execute("""
CREATE TABLE special_values (
empty_array ARRAY<STRUCT<
value:STRING,
otherValue:INT
>>,
nulled_array ARRAY<STRUCT<
value:STRING,
otherValue:INT
>>,
empty_map MAP<STRING, STRUCT<
value:STRING,
otherValue:INT
>>,
nulled_map MAP<STRING, STRUCT<
value:STRING,
otherValue:INT
>>
)
""")
childHiveShell.insertInto("default", "special_values").addRow(
emptyArray<Any>(),
null,
emptyMap<Any, Any>(),
null
).commit()
}
The setupHQL method is a new hook added in TestBase to execute additional test specific setup once per shell. In this case, we’re using it to create a Hive table holding a single row where the columns represent all the special, typed values for our collections. The special values being when the input should be either NULL or an empty version of that Hive ARRAY or MAP. You may recall the previous discussion about how to type a literal NULL value statement and the problems that brings up. For instance, how do you think this statement should get mapped to an evaluate method in the above class:
SELECT sandbox_collections(NULL)
Which overloaded evaluate method do we map to? Who knows! Ambiguity does not rock. This might appear to work if (and only if) you happen to have an evaluate taking a single or variable arg String input, but it will only be working for that one method. You really do need to make sure that all entry points can pass the NULL and empty scenarios since it is always possible for them to come in via a table definition.
So, we shall force a typing for NULL and empty ARRAY / MAP values so that Hive can route to the appropriate evaluate method we need for the test. We can’t cast to the desired type like we did before for primitives (e.g., SELECT INT(NULL)), but we can force a type when creating a table schema as we do above. Then the Klarna HiveShell‘s helpers are used to add the data without having to worry about how to handle the SerDe encodings. Problem solved! Now we can use these as inputs to a test case like so:
@Test
fun nullInputList() {
assertEquals(
"NULL",
queryOne("SELECT sandbox_collections(nulled_array) FROM special_values")
)
}
@Test
fun emptyInputList() {
assertEquals(
"[]",
queryOne("SELECT sandbox_collections(empty_array) FROM special_values")
)
}
Safety and security at last!
But why make it easy?
Everything we’ve been doing so far covers up the inner workings of Hive using reflection magic behind the scenes. This is great when what you want to implement is relatively simple conceptually, but as we move on to the non-scalar UDFs this won’t be an option. So lets get a preview of what building a generic UDF (some background here) would look like.
We’ll plan to build a UDF as close to the original SandboxComplexUDF as possible but using the GenericUDF base class instead of UDF. Here’s the code here:
import org.apache.hadoop.hive.ql.exec.Description
import org.apache.hadoop.hive.ql.exec.UDFArgumentException
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
@Description(
name = "sandbox_complex_generic",
value = "_FUNC_(x) - Takes a struct and returns a downcased part of that struct",
extended = """
For example, you could do something like this:
SELECT _FUNC_(NAMED_STRUCT('value', 'Test Me', 'otherValue', 123));
==> {'value': 'test me', 'othervalue': 123}
"""
)
class SandboxComplexGenericUDF : GenericUDF() {
class TestMe {
var value:String? = null
var otherValue:Int? = null
fun undefined():Boolean {
return value == null || otherValue == null
}
fun reset() {
value = null
otherValue = null
}
}
val testObj = TestMe()
var argOI:SettableStructObjectInspector? = null
val outputOI = ObjectInspectorFactory.getReflectionObjectInspector(
TestMe::class.java,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA
)
val converter by lazy { ObjectInspectorConverters.getConverter(argOI, outputOI) }
override fun evaluate(args:Array<out DeferredObject>?):Any? {
if(args?.size != 1) {
testObj.reset()
return testObj
}
val obj = converter!!.convert(args!!.first().get()) as TestMe
if(obj.undefined()) {
testObj.reset()
return testObj
}
testObj.value = obj.value?.toLowerCase()
testObj.otherValue = obj.otherValue
return testObj
}
override fun initialize(args:Array<out ObjectInspector>?):ObjectInspector? {
if(args?.size != 1 || args!![0] !is StructObjectInspector) {
throw UDFArgumentException("SandboxComplexGenericUDF expects 1 struct argument")
}
argOI = args[0] as SettableStructObjectInspector
return outputOI
}
override fun getDisplayString(args:Array<out String>):String? {
return "sandbox_complex_generic(" + args[0]+ " )";
}
}
Pretty different, huh?
First note that we now have an object inspector (OI) object acting as a broker between all values in the UDF and the wire (i.e., what’s coming in from somewhere else in the cluster and going back out to the cluster). We need to be careful in our initialize method that we can:
- Get an OI for the input args appropriate for what we expect
(e.g., do you want an int? a map?) - Legally create an OI for the value we expect to return
(errors might occur if Hive doesn’t yet support a mapping for the data type you want to use) - Convert from the input OI to our in-memory model object
(mostly important for going from a struct to a Java POJO)
All of our reflection based UDFs were getting this infrastructure for free but at the cost of using Java reflection (very expensive!). This isn’t a big deal for scalar UDFs, but it can become a significant cost with UDAFs and UDTFs.
Our case here is still pretty straightforward compared to the first version, but this could definitely grow quite large if we had to deal with a ton of different complex I/O types! Know your options and apply the best when the situation demands it.
Differences versus reflection
The full test file for this is very much like the original for SandboxComplexUDF but has a few additions. You can find it here. Note the following tests:
@Test
fun firstMemberWrongType() {
assertEquals(
TestMe("123", 123),
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', 123,
'otherValue', 123
)
)
""")
)
}
@Test
fun secondMemberWrongType() {
assertEquals(
TestMe(null, null),
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', 'Hola',
'otherValue', 'banana'
)
)
""")
)
}
@Test(expected = IllegalArgumentException::class)
fun firstMemberHardcoreWrongType() {
assertEquals(
TestMe(null, null),
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', ARRAY('Hola', 'Hola', 'Hola'),
'otherValue', 123
)
)
""")
)
}
@Test(expected = IllegalArgumentException::class)
fun secondMemberHardcoreWrongType() {
assertEquals(
TestMe(null, null),
queryForClass("""
SELECT sandbox_complex(
NAMED_STRUCT(
'value', 'Hola',
'otherValue', ARRAY(123, 456, 789)
)
)
""")
)
}
These show that the generic version is treating unexpected types significantly different than in the reflection based UDFs. Especially in the case of the integer member, you’ll notice that when it can’t convert the string input struct’s type to an integer it will instead assign a NULL value. This causes the original design of the UDF to NULL out the entire return struct instead of just that member. Is this really what we want? You’ll have to decide depending on your situation!
Done with basic UDFs!
Find the complete code for this post in the same repo as the last here.
This completes all we will get into for now in regards to implementing a Hive scalar UDF using Kotlin. You should have all you need to get started in these last few posts.
Next post will be on our first example of implementing a UDTF.