Amazon Athena - это интерактивная служба запросов, которая упрощает анализ данных в Amazon S3 с использованием стандартного SQL. Athena не имеет сервера, поэтому нет инфраструктуры для управления. Athena прямо из коробки интегрирована с каталогом данных AWS Glue.

Что такое AWS Glue?

AWS Glue - это полностью управляемая служба извлечения, преобразования и загрузки (ETL), которая упрощает для клиентов подготовку и загрузку данных для аналитики. Мы можем просто указать AWS Glue наши данные, хранящиеся на AWS S3, и AWS Glue обнаружит наши данные и сохранит связанные метаданные (например, определение таблицы и схему) в каталоге данных AWS Glue. После каталогизации наши данные сразу становятся доступными для поиска, запросов и становятся доступными для ETL.

Как настроить AWS Glue?

Чтобы сначала настроить AWS Glue, нам нужно добавить базу данных, в которой будут сохраняться наши данные. Чтобы добавить базу данных, мы можем просто перейти в консоль AWS Glue и на вкладке Базы данных нажать на Добавить базу данных.

После добавления базы данных нам нужно добавить Crawler. Сканер используется для заполнения каталога данных AWS Glue таблицами. он может сканировать несколько хранилищ данных за один запуск. Сканеры могут сканировать как файловые, так и табличные хранилища данных. Чтобы добавить сканера, нам нужно перейти на вкладку Поисковые роботы в консоли AWS Glue и нажать на Добавить сканера. Нам нужно передать имя базы данных и сегмент S3 (где хранятся наши файлы CSV) на страницу создания поискового робота. Теперь нам нужно ЗАПУСТИТЬ краулер для извлечения данных из корзины S3.

Как настроить Amazon Athena?

После настройки AWS Glue Crawler нам нужно выбрать AwsDataCatalog на вкладке Источник данных на консоли AWS Athena. Теперь мы можем найти наши базы данных на вкладке «База данных». После выбора базы данных мы можем найти таблицы, связанные с базой данных, на вкладке Таблицы.

Как использовать Amazon Athena в Node.js?

Чтобы использовать Amazon Athena в Node.js, мы можем выполнить следующие шаги.

Шаг 1. Создайте идентификатор выполнения запроса

function createQueryExecutionId(callback) {
var params = {
QueryString: `SELECT * FROM "database"."Table"`,
QueryExecutionContext: {
Database: 'pricedb' /* required */
},
ResultConfiguration: { EncryptionConfiguration: 
{ EncryptionOption: 'SSE_S3', /* required */ },
OutputLocation: 's3://your-bucket/output' },
};
athena.startQueryExecution(params, function (err, data) {
callback(err ? err.stack : err, data)
})}

Шаг 2. Проверьте статус созданного идентификатора выполнения запроса на последнем шаге.

function checkQueryCreateStatus(callback) {
const params = { QueryExecutionId: this.QueryExecutionId 
/* required */ };
athena.getQueryExecution(params, function (err, data) {
if (err) console.log(err, err.stack); // an error occurred
else {
if (data && data.QueryExecution && data.QueryExecution.Status && data.QueryExecution.Status.State && data.QueryExecution.Status.State === 'RUNNING') {
console.log("Athena Query status is running");
callback("RUNNING")}
else {
console.log("Atehna Query status is Active")
callback(err ? err.stack : err, data)}}
})}

Шаг 3: Получите результат запроса, когда статус запроса Athena активен.

function getQueryResultByExecutionId(queryExecutionId, callback) {
const params = { QueryExecutionId: queryExecutionId };
athena.getQueryResults(params, function (err, data) {
callback(err ? err.stack : err, data)
})}

Шаг 4. Остановите идентификатор выполнения запроса Athena

function stopQueryExecutionId(queryExecutionId, callback) {
const params = { QueryExecutionId: queryExecutionId };
athena.stopQueryExecution(params, function (err, data) {
callback(err ? err.stack : err, data)
})}

Шаг 5. Теперь получите данные из AWS Athena.

const aws = require('aws-sdk');
const async = require('async');

aws.config.update({
accessKeyId: 'YOUR_ACCESS_KEY',
secretAccessKey: 'YOUR_SECRET_KEY',
region: 'REGION' })
const athena = new aws.Athena();

const getData = () => new Promise((resolve, reject) => {
async.waterfall([ (callback) => {
createQueryExecutionId(callback)},
(query, callback) => { async.retry({times: 60,interval: 1000}, 
checkQueryCreateStatus.bind(query), (err, result) => {
if (!err) { callback(null, query)}
else { callback(err) }});
}, (query, callback) => {
getQueryResultByExecutionId(query.QueryExecutionId, (err, result) => {
callback(null, result, query)})
}, (queryResult, query, callback) => {
stopQueryExecutionId(query.QueryExecutionId, (err, result) => {
callback(err, queryResult)});
}], (error, result) => {
if (result) { resolve(result)} 
else { reject({ error })}
})});

Спасибо за чтение.

Если вы сочтете эту статью полезной, хлопните в ладоши. :)