[TIP] testing-in-python Digest, Vol 111, Issue 9

Kun Chen kunchen at everstring.com
Tue Apr 26 17:53:03 PDT 2016


Hi,

Thanks for the quick response.

I tried the pth way of starting coverage, it's working for the driver
process, and still not the worker process.

And I tried to patch the coverage source code into printing message or
writing something into a local file when it construct the Coverage instance
( of course from the coverage.process_startup() ), and the result is:

1. printing will got a java exception after spark-submit like the following
java.lang.IllegalArgumentException: port out of range:1668247142
at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
at java.net.InetSocketAddress.<init>(InetSocketAddress.java:185)
at java.net.Socket.<init>(Socket.java:241)
at
org.apache.spark.api.python.PythonWorkerFactory.createSocket$1(PythonWorkerFactory.scala:75)
at
org.apache.spark.api.python.PythonWorkerFactory.liftedTree1$1(PythonWorkerFactory.scala:90)
at
org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
at
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:135)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

2. writing a local file will have no effect at all, though writing file in
my customized rdd map function will work. like the following

import os
from multiprocessing import *
pid = current_process().pid

def handle(sc, file, ofile):
    rd = sc.textFile(file)
    rd.map(mysub).saveAsTextFile(ofile)

def mysub(row):
    print 'from mapper process {0}'.format(pid)
    print 'env : {0}'.format(os.getenv('COVERAGE_PROCESS_START'))
    f = open('/home/kunchen/{0}.txt'.format(pid), 'a')
    f.writelines([row])
    f.close()

    return row.replace(',',' ').replace('.',' ').replace('-',' ').lower()

I'm quite new to Spark and not sure how the worker process are executed.
Anyone ever tried to tackle this problem?

On Wed, Apr 27, 2016 at 3:00 AM, <testing-in-python-request at lists.idyll.org>
wrote:

> Send testing-in-python mailing list submissions to
>         testing-in-python at lists.idyll.org
>
> To subscribe or unsubscribe via the World Wide Web, visit
>         http://lists.idyll.org/listinfo/testing-in-python
> or, via email, send a message with subject or body 'help' to
>         testing-in-python-request at lists.idyll.org
>
> You can reach the person managing the list at
>         testing-in-python-owner at lists.idyll.org
>
> When replying, please edit your Subject line so it is more specific
> than "Re: Contents of testing-in-python digest..."
>
>
> Today's Topics:
>
>    1. how to generate coverage info for pyspark applications (Kun Chen)
>    2. Re: how to generate coverage info for pyspark applications
>       (Ned Batchelder)
>
>
> ----------------------------------------------------------------------
>
> Message: 1
> Date: Tue, 26 Apr 2016 21:01:02 +0800
> From: Kun Chen <kunchen at everstring.com>
> Subject: [TIP] how to generate coverage info for pyspark applications
> To: testing-in-python at lists.idyll.org
> Message-ID:
>         <
> CAPTVxySrrmtV7kqYap0JJUnrctR5ifspXCwjtPFL1TCodjDdcQ at mail.gmail.com>
> Content-Type: text/plain; charset="utf-8"
>
> Hi, all
>
> I tried to run a simple pyspark application on spark in local mode, and was
> hoping to get the coverage data file generated somewhere for future use.
>
> 0. I put the following lines at the head of
> /usr/lib/python2.7/sitecustomize.py
> import coverage
> coverage.process_startup()
>
> 1. I set the following env variable in ~/.bashrc
> export COVERAGE_PROCESS_START=/home/kunchen/git/es-signal/.coveragerc
>
> 2. the config file '/home/kunchen/git/es-signal/.coveragerc' has following
> content
> [run]
> parallel = True
> concurrency = multiprocessing
> omit =
>     *dist-packages*
>     *pyspark*
>     *spark-1.5.2*
> cover_pylib = False
> data_file = /home/kunchen/.coverage
>
> 3. I put ci3.py and test.py both
> in /home/kunchen/Downloads/software/spark-1.5.2 ( my spark home )
>
> 4. in my spark home, I ran the following command to submit and run the
> code.
> spark-submit --master local --py-files=ci3.py test.py
>
>
> 6. after the application finished, I got two coverage files in
> /home/kunchen
> .coverage.kunchen-es-pc.31117.003485
> .coverage.kunchen-es-pc.31176.826660
>
> but according to the process id in the file names and the content of those
> files, none of them was generated by the spark worker process(or thread?
> not sure here).
>
> My question is what I have to do to get the coverage data of the code being
> executed by the spark workers?
> -------------- next part --------------
> An HTML attachment was scrubbed...
> URL: <
> http://lists.idyll.org/pipermail/testing-in-python/attachments/20160426/a4ea1c92/attachment-0001.htm
> >
> -------------- next part --------------
> A non-text attachment was scrubbed...
> Name: ci3.py
> Type: text/x-python
> Size: 369 bytes
> Desc: not available
> URL: <
> http://lists.idyll.org/pipermail/testing-in-python/attachments/20160426/a4ea1c92/attachment-0002.py
> >
> -------------- next part --------------
> A non-text attachment was scrubbed...
> Name: test.py
> Type: text/x-python
> Size: 231 bytes
> Desc: not available
> URL: <
> http://lists.idyll.org/pipermail/testing-in-python/attachments/20160426/a4ea1c92/attachment-0003.py
> >
>
> ------------------------------
>
> Message: 2
> Date: Tue, 26 Apr 2016 11:32:38 -0400
> From: Ned Batchelder <ned at nedbatchelder.com>
> Subject: Re: [TIP] how to generate coverage info for pyspark
>         applications
> To: testing-in-python at lists.idyll.org
> Message-ID: <c4f7fd43-60e8-b863-fb1c-862d301ae9a0 at nedbatchelder.com>
> Content-Type: text/plain; charset="windows-1252"; Format="flowed"
>
> I don't know anything about spark, so I'm not sure how it starts up its
> workers.  My first suggestion would be to use the .pth method of
> starting coverage in subprocesses rather than the sitecustomize
> technique, and see if that works better.
>
> --Ned.
>
>
> On 4/26/16 9:01 AM, Kun Chen wrote:
> > Hi, all
> >
> > I tried to run a simple pyspark application on spark in local mode,
> > and was hoping to get the coverage data file generated somewhere for
> > future use.
> >
> > 0. I put the following lines at the head of
> > /usr/lib/python2.7/sitecustomize.py
> > import coverage
> > coverage.process_startup()
> >
> > 1. I set the following env variable in ~/.bashrc
> > export COVERAGE_PROCESS_START=/home/kunchen/git/es-signal/.coveragerc
> >
> > 2. the config file '/home/kunchen/git/es-signal/.coveragerc' has
> > following content
> > [run]
> > parallel = True
> > concurrency = multiprocessing
> > omit =
> >     *dist-packages*
> >     *pyspark*
> >     *spark-1.5.2*
> > cover_pylib = False
> > data_file = /home/kunchen/.coverage
> >
> > 3. I put ci3.py and test.py both
> > in /home/kunchen/Downloads/software/spark-1.5.2 ( my spark home )
> >
> > 4. in my spark home, I ran the following command to submit and run the
> > code.
> > spark-submit --master local --py-files=ci3.py test.py
> >
> >
> > 6. after the application finished, I got two coverage files in
> > /home/kunchen
> > .coverage.kunchen-es-pc.31117.003485
> > .coverage.kunchen-es-pc.31176.826660
> >
> > but according to the process id in the file names and the content of
> > those files, none of them was generated by the spark worker process(or
> > thread? not sure here).
> >
> > My question is what I have to do to get the coverage data of the code
> > being executed by the spark workers?
> >
> >
> >
> > _______________________________________________
> > testing-in-python mailing list
> > testing-in-python at lists.idyll.org
> > http://lists.idyll.org/listinfo/testing-in-python
>
> -------------- next part --------------
> An HTML attachment was scrubbed...
> URL: <
> http://lists.idyll.org/pipermail/testing-in-python/attachments/20160426/faa13f8a/attachment-0001.htm
> >
>
> ------------------------------
>
> _______________________________________________
> testing-in-python mailing list
> testing-in-python at lists.idyll.org
> http://lists.idyll.org/listinfo/testing-in-python
>
>
> End of testing-in-python Digest, Vol 111, Issue 9
> *************************************************
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.idyll.org/pipermail/testing-in-python/attachments/20160427/21301b08/attachment.htm>


More information about the testing-in-python mailing list