Spark (и Hadoop) не поддерживает чтение частей двоичных файлов HDF5. (Я подозреваю, что причина этого в том, что HDF5 является контейнерным форматом для хранения документов и позволяет указывать древовидную иерархию для документов).
Но если вам нужно прочитать файл с локального диска - это выполнимо с помощью Spark, особенно если вы знаете внутреннюю структуру вашего файла HDF5.
Вот пример - предполагается, что вы будете запускать локальное искровое задание, и вы заранее знаете, что ваш набор данных HDF5 '/mydata' состоит из 100 фрагментов.
h5file_path="/absolute/path/to/file"
def readchunk(v):
empty = h5.File(h5file_path)
return empty['/mydata'][v,:]
foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()
Идя дальше, вы можете изменить программу, чтобы определить количество фрагментов, используя f5['/mydata'].shape[0]
Следующим шагом будет перебор нескольких наборов данных (вы можете перечислить наборы данных с помощью f5.keys()
).
Также есть другая статья "От наборов данных HDF5 к Apache Spark RDD", которые описывают аналогичный подход.
Тот же подход сработает и в распределенном кластере, но он малоэффективен. h5py требует, чтобы файл находился в локальной файловой системе. Таким образом, этого можно добиться несколькими способами: скопировать файл на все рабочие процессы и сохранить его в одном и том же месте на диске рабочих процессов; или поместите файл в HDFS и смонтируйте HDFS с помощью fusefs, чтобы рабочие могли получить доступ к файлу. Оба способа имеют некоторую неэффективность, но этого должно быть достаточно для специальных задач.
Вот оптимизированная версия, которая открывает h5 только один раз на каждом исполнителе:
h5file_path="/absolute/path/to/file"
_h5file = None
def readchunk(v):
# code below will be executed on executor - in another python process on remote server
# original value for _h5file (None) is sent from driver
# and on executor is updated to h5.File object when the `readchunk` is called for the first time
global _h5file
if _h5file is None:
_h5file = h5.File(h5file_path)
return _h5file['/mydata'][v,:]
foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()
24.06.2015