-
Notifications
You must be signed in to change notification settings - Fork 0
/
TreasureData.kt
69 lines (63 loc) · 2.07 KB
/
TreasureData.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import com.treasuredata.client.*
import com.google.common.base.Function
import org.msgpack.core.MessagePack
import org.msgpack.core.MessageUnpacker
import org.msgpack.value.ArrayValue
import org.msgpack.core.MessageFormat
import com.treasuredata.client.model.*
import java.io.InputStream
import java.io.File
import java.io.PrintWriter
import kotlin.String
import com.google.gson.Gson
import com.google.gson.GsonBuilder
val gson = Gson()
val pwriter = PrintWriter("download.json")
fun f1( array: ArrayValue ) {
val arr = array.toList().map{ x -> x.toString()}
val tuuid = arr[6]
when {
tuuid != "null" -> {
//println( tuuid )
val json = gson.toJson( arr )
pwriter.append(json + "\n")
}
}
}
fun close() {
pwriter.close()
}
fun <T> printerr(t: T) {
System.`err`.println(t)
}
fun main(args: Array<String>) {
val client = TDClient.newClient()
println("Start connecting to TreasureData Database.")
val names:List<String> = client.listDatabases().map { db ->
db.getName().toString()
}.toList()
names.map { name -> println("There is Database of ${name}.") }
// prestoは3倍ぐらいhiveより早いが、メモリが全然足りない40G ~ 80Gで死んでしまう
//val jobId = client.submit(TDJobRequest.newPrestoQuery("dac_aonesync",
val jobId = client.submit(TDJobRequest.newHiveQuery("tech_batch",
File("kotlinDriver.sql").readText() ));
val backOff = ExponentialBackOff()
val job:TDJobSummary = client.jobStatus(jobId)
for (counter in (0..10000000)) {
if( client.jobStatus(jobId).getStatus().isFinished() ) break
printerr("waiting result... now iter ${counter}...")
Thread.sleep(500)
}
val jobInfo:TDJob = client.jobInfo(jobId)
println("log:\n ${jobInfo.getCmdOut()}")
println("error log:\n ${jobInfo.getStdErr()}")
val unpacker = TDHandler().unpackerHandler(client, jobId)
println("Unpackerが呼び出されました")
while( unpacker.hasNext() ) {
val array = unpacker.unpackValue().asArrayValue()
f1(array)
}
close()
println("Finished access to TresureData Database.")
System.exit(0)
}