Arhn - архитектура программирования

Сериализатор Kryo вызывает исключение в базовом классе Scala WrappedArray

Два вопроса, ответ на общий поможет мне понять, насколько минимально я могу сделать MVCE.

1) Как узнать, что WrappedArray нужно зарегистрировать заранее (и любой другой класс в Scala, который я мог бы использовать)? Нормально ли регистрировать классы из библиотек в Kryo?

и конкретный:

2) Как это исправить? (Готов признать, что у меня может быть что-то еще странное, если я отражаю здесь ложную ошибку, так что не убивайте себя, пытаясь воспроизвести это)

ДЕТАЛИ

Тестирование программы Spark на Java с использованием наших пользовательских классов, связанных с генетикой и статистикой, на Spark 1.4.1, Scala 2.11.5 со следующими настройками в SparkConf:

// for kyro serializer it wants to register all classes that need to be serialized
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class};

SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData")
                <SNIP other settings to declare master>
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                //require registration of all classes with Kryo
                .set("spark.kryo.registrationRequired", "true")
                .registerKryoClasses(kryoClassArray);

Получение этой ошибки (повторяется в конце длинного списка ошибок):

Caused by: java.lang.IllegalArgumentException: Class is not
registered: scala.collection.mutable.WrappedArray$ofRef Note: To
register this class use:
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);

Но я никогда не вызываю этот класс из своего кода. Я могу добавить scala.collection.mutable.WrappedArray в kryoClassArray, но это не решит проблему. Если я добавлю scala.collection.mutable.WrappedArray$ofRef.class (как предложено в ошибке), это синтаксическая ошибка, я вижу, что не могу объявить здесь анонимную функцию?

MVCE: я запустил MVCE, но проблема в том, что для этого с нашими классами требуются внешние библиотеки и текстовые файлы/файлы данных. Как только я уберу наши классы, у меня не будет проблемы. Если бы кто-то мог ответить на общий вопрос, это могло бы помочь мне понять, какую часть MVCE я могу придумать.

Пока я пишу этот вопрос, я получил добро на обновление до 1.5.2, посмотрю, есть ли там какие-либо изменения, и обновлю вопрос, если да.

Если не считать MVCE, вот мои объявления классов:

public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable {

class PrintHetSharing implements VoidFunction<DropResult> {

class SparkDoDrop implements Function<Integer, Integer> {

Полные ошибки:

16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:55646/user/Executor#214759698]) with ID 0
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);

  • 1. Знание того, какие классы потребуют сериализации, требует от вас проверки вашего кода и понимания того, что он делает (вы вставили только пример конфигурации, а не использование). 2. То же, что и 1, без примера кода ответить невозможно. 13.01.2016
  • Конечно, @DanielL. Я отредактирую код. Но похоже, вы говорите, что мне нужно знать каждый базовый класс? Как общий принцип? Я пишу на Java, поэтому я не ожидал, что мне понадобится знать базовые классы Scala, чтобы заставить Kryo работать. Спасибо 13.01.2016
  • @ДэниелЛ. Я ценю запрос MVCE, проблема, с которой я столкнулся, заключается в том, что для его выполнения с нашими классами требуются внешние библиотеки и текстовые файлы/файлы данных. Как только я уберу наши классы и потребность в наших файлах, у меня не будет проблемы. Если бы кто-то мог ответить на общий вопрос, это могло бы помочь мне понять, какую часть MVCE я могу придумать. Я реализую Serializable во всех классах либо явно, либо путем реализации функций из Spark, таких как импорт org.apache.spark.api.java.function.Function и org.apache.spark.api.java.function.VoidFunction. 13.01.2016

Ответы:


1

В Scala вы должны исправить эту проблему, добавив 'scala.collection.mutable.WrappedArray.ofRef[_]' в качестве зарегистрированного класса, как в следующем фрагменте кода:

conf.registerKryoClasses(
  Array(
    ...
    classOf[Person],
    classOf[Array[Person]],
    ...
    classOf[scala.collection.mutable.WrappedArray.ofRef[_]]
  )
)
24.05.2016
  • Сначала ваш ответ выглядит как Scala, но я на Java, но я понимаю :) Я ценю ответ, но основной вопрос остается без ответа, почему я должен объявлять этот класс, если я его не использую? Мне не нужно объявлять каждый класс в Spark, почему именно этот? Я не пытался использовать Kryo какое-то время, я должен повторно реализовать его сейчас, когда наше решение намного дальше, а Spark на пару версий новее. +1 хотя, спасибо! 24.05.2016
  • не уверен, почему в вопросе не было тега java, мой плохой, извините, он был в вопросе, но не в тегах, упс 24.05.2016
  • принял этот ответ сейчас, когда я пересматриваю его, он не дает полного ответа, поскольку этот код Scala не работает в Java. Все таки это ближе к конкретному ответу. Я мог бы поклясться в другом вопросе, который кто-то опубликовал, как добавить этот класс Scala в массив Java, используя пример .ofRef[] или $ofRef в стиле Java, оба не работают. На данный момент я ослабил необходимые настройки Kryo. 21.06.2016

  • 2

    Вам не нужно делать все сериализуемым, независимо от того, является ли оно частью клиентской библиотеки или нет. Но вам ДЕЙСТВИТЕЛЬНО нужно сделать любую лямбду, которая будет действовать на исполнителей, сериализуемой. Они не запускаются на главном узле, поэтому нет способа предотвратить сериализацию (и вы этого не хотите, поскольку вся цель Spark — распределенные вычисления).

    Для примеров и тому подобного (и если вы еще не совсем поняли концепцию), проверьте официальные документы по этому поводу.

    13.01.2016
  • Спасибо, прояснение общего вопроса позволяет мне узнать, на чем сосредоточить свои усилия, очень полезно! Я все еще немного озадачен, почему класс scala WrappedArray сообщается как тот, который не может быть сериализован. Я разберу свой код и соберу его обратно. Я понимаю анонимные функции и использую их при использовании встроенных классов — когда я использую наши классы, я объявляю их отдельно. Я все еще буду работать над MVCE, еще раз спасибо 14.01.2016
  • Новые материалы

    Коллекции публикаций по глубокому обучению
    Последние пару месяцев я создавал коллекции последних академических публикаций по различным подполям глубокого обучения в моем блоге https://amundtveit.com - эта публикация дает обзор 25..

    Представляем: Pepita
    Фреймворк JavaScript с открытым исходным кодом Я знаю, что недостатка в фреймворках JavaScript нет. Но я просто не мог остановиться. Я хотел написать что-то сам, со своими собственными..

    Советы по коду Laravel #2
    1-) Найти // You can specify the columns you need // in when you use the find method on a model User::find(‘id’, [‘email’,’name’]); // You can increment or decrement // a field in..

    Работа с временными рядами спутниковых изображений, часть 3 (аналитика данных)
    Анализ временных рядов спутниковых изображений для данных наблюдений за большой Землей (arXiv) Автор: Рольф Симоэс , Жильберто Камара , Жильберто Кейрос , Фелипе Соуза , Педро Р. Андраде ,..

    3 способа решить квадратное уравнение (3-й мой любимый) -
    1. Методом факторизации — 2. Используя квадратичную формулу — 3. Заполнив квадрат — Давайте поймем это, решив это простое уравнение: Мы пытаемся сделать LHS,..

    Создание VR-миров с A-Frame
    Виртуальная реальность (и дополненная реальность) стали главными модными терминами в образовательных технологиях. С недорогими VR-гарнитурами, такими как Google Cardboard , и использованием..

    Демистификация рекурсии
    КОДЕКС Демистификация рекурсии Упрощенная концепция ошеломляющей О чем весь этот шум? Рекурсия, кажется, единственная тема, от которой у каждого начинающего студента-информатика..