scala - How can I load Avros in Spark using the schema on-board the Avro file(s)? -


i running cdh 4.4 spark 0.9.0 cloudera parcel.

i have bunch of avro files created via pig's avrostorage udf. want load these files in spark, using generic record or schema onboard avro files. far i've tried this:

import org.apache.avro.mapred.avrokey import org.apache.avro.mapreduce.avrokeyinputformat import org.apache.hadoop.io.nullwritable import org.apache.commons.lang.stringescapeutils.escapecsv  import org.apache.hadoop.fs.path import org.apache.hadoop.fs.filesystem import org.apache.hadoop.conf.configuration import java.net.uri import java.io.bufferedinputstream import java.io.file import org.apache.avro.generic.{genericdatumreader, genericrecord} import org.apache.avro.specific.specificdatumreader import org.apache.avro.file.datafilestream import org.apache.avro.io.datumreader import org.apache.avro.file.datafilereader import org.apache.avro.mapred.fsinput  val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro" val inuri = new uri(input) val inpath = new path(inuri)  val fsinput = new fsinput(inpath, sc.hadoopconfiguration) val reader =  new genericdatumreader[genericrecord] val datafilereader = datafilereader.openreader(fsinput, reader) val schemastring = datafilereader.getschema  val buf = scala.collection.mutable.listbuffer.empty[genericrecord] while(datafilereader.hasnext)  {   buf += datafilereader.next } sc.parallelize(buf) 

this works 1 file, can't scale - loading data local ram , distributing across spark nodes there.

to answer own question:

import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._  import org.apache.avro.generic.genericrecord import org.apache.avro.mapred.avrokey import org.apache.avro.mapred.avroinputformat import org.apache.avro.mapreduce.avrokeyinputformat import org.apache.hadoop.io.nullwritable import org.apache.commons.lang.stringescapeutils.escapecsv  import org.apache.hadoop.fs.filesystem import org.apache.hadoop.fs.path import org.apache.hadoop.conf.configuration import java.io.bufferedinputstream import org.apache.avro.file.datafilestream import org.apache.avro.io.datumreader import org.apache.avro.file.datafilereader import org.apache.avro.file.datafilereader import org.apache.avro.generic.{genericdatumreader, genericrecord} import org.apache.avro.mapred.fsinput import org.apache.avro.schema import org.apache.avro.schema.parser import org.apache.hadoop.mapred.jobconf import java.io.file import java.net.uri  // spark-shell -usejavacp -classpath "*.jar"  val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"  val jobconf= new jobconf(sc.hadoopconfiguration) val rdd = sc.hadoopfile(   input,   classof[org.apache.avro.mapred.avroinputformat[genericrecord]],   classof[org.apache.avro.mapred.avrowrapper[genericrecord]],   classof[org.apache.hadoop.io.nullwritable],   10) val f1 = rdd.first val = f1._1.datum a.get("rawlog") // access avro fields 

Comments

Popular posts from this blog

c++ - OpenCV Error: Assertion failed <scn == 3 ::scn == 4> in unknown function, -

php - render data via PDO::FETCH_FUNC vs loop -

The canvas has been tainted by cross-origin data in chrome only -