Writes query results from a
SELECTstatement to the specified data format. Supported formats forUNLOADincludeApache Parquet,ORC,Apache Avro, andJSON.CSVis the only output format used by theAthenaSELECTquery, but you can useUNLOADto write the output of aSELECTquery to the formats thatUNLOADsupports.Although you can use the
CTASstatement to output data in formats other thanCSV, those statements also require the creation of a table in Athena. TheUNLOADstatement is useful when you want to output the results of aSELECTquery in anon-CSVformat but do not require the associated table. For example, a downstream application might require the results of aSELECTquery to be inJSONformat, andParquetorORCmight provide a performance advantage overCSVif you intend to use the results of theSELECTquery for additional analysis.
noctua v-2.2.0.9000+ can now leverage this functionality
with the unload parameter within dbGetQuery,
dbSendQuery, dbExecute. This functionality
offers faster performance for mid to large result sizes.
unload=FALSE (Default)Regular query on AWS Athena and then reads the table
data as CSV directly from AWS S3.
PROS:
CONS:
unload=TRUEWraps the query with a UNLOAD and then reads the table
data as parquet directly from AWS S3.
PROS:
CONS:
order by due to multiple parquet
files being produced by AWS Athena.Set up AWS Athena table (example taken from AWS
Data Wrangler: Amazon Athena Tutorial):
# Python
import awswrangler as wr
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
);
wr.catalog.table(database="awswrangler_test", table="noaa")Benchmark unload method using noctua.
# R
library(DBI)
con <- dbConnect(noctua::athena())
dbGetQuery(con, "select count(*) as n from awswrangler_test.noaa")
# Info: (Data scanned: 0 Bytes)
# n
# 1: 29554197
# Query ran using CSV output
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 57.004 8.430 160.567
dim(df)
# [1] 29554197 8
noctua::noctua_options(cache_size = 1)
# Query ran using UNLOAD Parquet output
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 21.622 2.350 39.232
dim(df)
# [1] 29554197 8
# Query ran using cached UNLOAD Parquet output
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 13.738 1.886 11.029
dim(df)
# [1] 29554197 8| Method | Time (seconds) |
|---|---|
unload=FAlSE |
160.567 |
unload=TRUE |
39.232 |
Cache unload=TRUE |
11.029 |
From this simple benchmark test there is a significant improvement in
the performance when querying AWS Athena while
unload=TRUE.
Note: Benchmark ran on AWS Sagemaker
ml.t3.xlarge instance.
unload = TRUE on package level:Another method to set unload=TRUE is to use
noctua_options(). By setting
noctua_options(unload=TRUE), unload is set to
TRUE package level and all DBI functionality
will use it when applicable.
library(DBI)
library(noctua)
con <- dbConnect(athena())
noctua_options(unload = TRUE)
dbi_noaa = dbGetQuery(con, "select * from awswrangler_test.noaa")This also give benefits for when using dplyr
functionality. When setting noctua_options(unload=TRUE) all
dplyr lazy evaluation will start using
AWS Athena unload.
tbl_noaa = tbl(con, dbplyr::in_schema("awswrangler_test", "noaa"))
tbl_noaa %>% collect()
#> # A tibble: 29,554,197 × 8
#> id dt element value m_flag q_flag s_flag obs_time
#> <chr> <dttm> <chr> <int> <chr> <chr> <chr> <chr>
#> 1 ASN00074198 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 2 ASN00074222 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 3 ASN00074227 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 4 ASN00075001 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 5 ASN00075005 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 6 ASN00075006 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 7 ASN00075011 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 8 ASN00075013 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 9 ASN00075014 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> 10 ASN00075018 1890-01-05 00:00:00 PRCP 0 NA NA a NA
#> # … with 29,554,187 more rows
noaa %>% filter(element == "PRCP") %>% collect()
#> # A tibble: 15,081,580 × 8
#> id dt element value m_flag q_flag s_flag obs_time
#> <chr> <dttm> <chr> <int> <chr> <chr> <chr> <chr>
#> 1 SWE00140492 1890-01-06 00:00:00 PRCP 0 NA NA E NA
#> 2 SWE00140594 1890-01-06 00:00:00 PRCP 4 NA NA E NA
#> 3 SWE00140746 1890-01-06 00:00:00 PRCP 0 NA NA E NA
#> 4 SWE00140828 1890-01-06 00:00:00 PRCP 0 NA NA E NA
#> 5 SWM00002080 1890-01-06 00:00:00 PRCP 0 NA NA E NA
#> 6 SWM00002485 1890-01-06 00:00:00 PRCP 1 NA NA E NA
#> 7 SWM00002584 1890-01-06 00:00:00 PRCP 0 NA NA E NA
#> 8 TSE00147769 1890-01-06 00:00:00 PRCP 33 NA NA E NA
#> 9 TSE00147775 1890-01-06 00:00:00 PRCP 150 NA NA E NA
#> 10 UK000047811 1890-01-06 00:00:00 PRCP 49 NA NA E NA
# … with 15,081,570 more rows