scala - How can I load Avros in Spark using the schema on-board the Avro file(s)? -
i running cdh 4.4 spark 0.9.0 cloudera parcel.
i have bunch of avro files created via pig's avrostorage udf. want load these files in spark, using generic record or schema onboard avro files. far i've tried this:
import org.apache.avro.mapred.avrokey import org.apache.avro.mapreduce.avrokeyinputformat import org.apache.hadoop.io.nullwritable import org.apache.commons.lang.stringescapeutils.escapecsv import org.apache.hadoop.fs.path import org.apache.hadoop.fs.filesystem import org.apache.hadoop.conf.configuration import java.net.uri import java.io.bufferedinputstream import java.io.file import org.apache.avro.generic.{genericdatumreader, genericrecord} import org.apache.avro.specific.specificdatumreader import org.apache.avro.file.datafilestream import org.apache.avro.io.datumreader import org.apache.avro.file.datafilereader import org.apache.avro.mapred.fsinput val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro" val inuri = new uri(input) val inpath = new path(inuri) val fsinput = new fsinput(inpath, sc.hadoopconfiguration) val reader = new genericdatumreader[genericrecord] val datafilereader = datafilereader.openreader(fsinput, reader) val schemastring = datafilereader.getschema val buf = scala.collection.mutable.listbuffer.empty[genericrecord] while(datafilereader.hasnext) { buf += datafilereader.next } sc.parallelize(buf)
this works 1 file, can't scale - loading data local ram , distributing across spark nodes there.
to answer own question:
import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.avro.generic.genericrecord import org.apache.avro.mapred.avrokey import org.apache.avro.mapred.avroinputformat import org.apache.avro.mapreduce.avrokeyinputformat import org.apache.hadoop.io.nullwritable import org.apache.commons.lang.stringescapeutils.escapecsv import org.apache.hadoop.fs.filesystem import org.apache.hadoop.fs.path import org.apache.hadoop.conf.configuration import java.io.bufferedinputstream import org.apache.avro.file.datafilestream import org.apache.avro.io.datumreader import org.apache.avro.file.datafilereader import org.apache.avro.file.datafilereader import org.apache.avro.generic.{genericdatumreader, genericrecord} import org.apache.avro.mapred.fsinput import org.apache.avro.schema import org.apache.avro.schema.parser import org.apache.hadoop.mapred.jobconf import java.io.file import java.net.uri // spark-shell -usejavacp -classpath "*.jar" val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro" val jobconf= new jobconf(sc.hadoopconfiguration) val rdd = sc.hadoopfile( input, classof[org.apache.avro.mapred.avroinputformat[genericrecord]], classof[org.apache.avro.mapred.avrowrapper[genericrecord]], classof[org.apache.hadoop.io.nullwritable], 10) val f1 = rdd.first val = f1._1.datum a.get("rawlog") // access avro fields
Comments
Post a Comment