Livy

Access your Spark Cluster from Everywhere with Apache Livy

Andre Münch Blog, Data Science

Livy is a REST web service for submitting Spark Jobs or accessing – and thus sharing – long-running Spark Sessions from a remote place. Instead of tedious configuration and installation of your Spark client, Livy takes over the work and provides you with a simple and convenient interface.

We at STATWORX use Livy to submit Spark Jobs from Apache’s workflow tool Airflow on volatile Amazon EMR cluster. Besides, several colleagues with different scripting language skills share a running Spark cluster.

Another great aspect of Livy, namely, is that you can choose from a range of scripting languages: Java, Scala, Python, R. As it is the case for Spark, which one of them you actually should/can use, depends on your use case (and on your skills).

livy-architecture
Architecture – https://livy.incubator.apache.org/

Apache Livy is still in the Incubator state, and code can be found at the Git project.

When you should use it

Since REST APIs are easy to integrate into your application, you should use it when:

  • multiple clients want to share a Spark Session.
  • the clients are lean and should not be overloaded with installation and configuration.
  • you need a quick setup to access your Spark cluster.
  • you want to Integrate Spark into an app on your mobile device.
  • you have volatile clusters, and you do not want to adapt configuration every time.
  • a remote workflow tool submits spark jobs.

Preconditions

Livy is generally user-friendly, and you do not really need too much preparation. All you basically need is an HTTP client to communicate to Livy’s REST API. REST APIs are known to be easy to access (states and lists are accessible even by browsers), HTTP(s) is a familiar protocol (status codes to handle exceptions, actions like GET and POST, etc.) while providing all security measures needed.

Since Livy is an agent for your Spark requests and carries your code (either as script-snippets or packages for submission) to the cluster, you actually have to write code (or have someone writing the code for you or have a package ready for submission at hand).

I opted to maily use python as Spark script language in this blog post and to also interact with the Livy interface itself. Some examples were executed via curl, too.

How to use Livy

There are two modes to interact with the Livy interface:

  • Interactive Sessions have a running session where you can send statements over. Provided that resources are available, these will be executed, and output can be obtained. It can be used to experiment with data or to have quick calculations done.
  • Jobs/Batch submit code packages like programs. A typical use case is a regular task equipped with some arguments and workload done in the background. This could be a data preparation task, for instance, which takes input and output directories as parameters.

In the following, we will have a closer look at both cases and the typical process of submission. Each case will be illustrated by examples.

Interactive Sessions

Let’s start with an example of an interactive Spark Session. Throughout the example, I use python and its requests package to send requests to and retrieve responses from the REST API. As mentioned before, you do not have to follow this path, and you could use your preferred HTTP client instead (provided that it also supports POST and DELETE requests).

Starting with a Spark Session. There is a bunch of parameters to configure (you can look up the specifics at Livy Documentation), but for this blog post, we stick to the basics, and we will specify its name and the kind of code. If you have already submitted Spark code without Livy, parameters like executorMemory, (YARN) queue might sound familiar, and in case you run more elaborate tasks that need extra packages, you will definitely know that the jars parameter needs configuration as well.

To initiate the session we have to send a POST request to the directive /sessions along with the parameters.

import requests
LIVY_HOST = 'http://livy-server'

directive = '/sessions'
headers = {'Content-Type': 'application/json'}

data = {'kind':'pyspark','name':'first-livy'}

resp = requests.post(LIVY_HOST+directive, headers=headers, data=json.dumps(data))

if resp.status_code == requests.codes.created:
    session_id = resp.json()['id']
else:
    raise CustomError()

Livy, in return, responds with an identifier for the session that we extract from its response.

Note that the session might need some boot time until YARN (a resource manager in the Hadoop world) has allocated all the resources. Meanwhile, we check the state of the session by querying the directive: /sessions/{session_id}/state. Once the state is idle, we are able to execute commands against it.

To execute spark code, statements are the way to go. The code is wrapped into the body of a POST request and sent to the right directive: sessions/{session_id}/statements.

directive = f'/sessions/{session_id}/statements'

data = {'code':'...'}

resp = request.post(LIVY_HOST+directive, headers=headers, data=json.dumps(data))

As response message, we are provided with the following attributes:

attributemeaning
idto identify the statement
codethe code, once again, that has been executed
statethe state of the execution
outputthe output of the statement

The statement passes some states (see below) and depending on your code, your interaction (statement can also be canceled) and the resources available, it will end up more or less likely in the success state. The crucial point here is that we have control over the status and can act correspondingly.

statement-state

By the way, cancelling a statement is done via GET request /sessions/{session_id}/statements/{statement_id}/cancel

It is time now to submit a statement: Let us imagine to be one of the classmates of Gauss and being asked to sum up the numbers from 1 to 1000. Luckily you have access to a spark cluster and – even more luckily – it has the Livy REST API running which we are connected to via our mobile app: what we just have to do is write the following spark code:

import textwrap

code = textwrap.dedent("""df = spark.createDataFrame(list(range(1,1000)),'int')
df.groupBy().sum().collect()[0]['sum(value)']""")

code_packed = {'code':code}

This is all the logic we need to define. The rest is the execution against the REST API:

import time

directive = f'/sessions/{session_id}/statements'
resp = requests.post(LIVY_HOST+directive, headers=headers, data=json.dumps(code_packed))
if resp.status_code == requests.codes.created:
    stmt_id = resp.json()['id']

    while True:
        info_resp = requests.get(LIVY_HOST+f'/sessions/{session_id}/statements/{stmt_id}')
        if info_resp.status_code == requests.codes.ok:
            state = info_resp.json()['state']
                if state in ('waiting','running'):
                    time.sleep(2)
                elif state in ('cancelling','cancelled','error'):
                    raise CustomException()
                else:
                    break
        else:
            raise CustomException()
    print(info_resp.json()['output'])

else:
  #something went wrong with creation
  raise CustomException()

Every 2 seconds, we check the state of statement and treat the outcome accordingly: So we stop the monitoring as soon as state equals available. Obviously, some more additions need to be made: probably error state would be treated differently to the cancel cases, and it would also be wise to set up a timeout to jump out of the loop at some point in time.

Assuming the code was executed successfully, we take a look at the output attribute of the response:

{'status': 'ok', 'execution_count': 2, 'data': {'text/plain': '499500'}}

There we go, the answer is 499500.

Finally, we kill the session again to free resources for others:

directive = f'/sessions/{session_id}/statements'
requests.delete(LIVY_HOST+directive)

Job Submission

We now want to move to a more compact solution. Say we have a package ready to solve some sort of problem packed as a jar or as a python script. What only needs to be added are some parameters – like input files, output directory, and some flags.

For the sake of simplicity, we will make use of the well known Wordcount example, which Spark gladly offers an implementation of: Read a rather big file and determine how often each word appears. We again pick python as Spark language. This time curl is used as an HTTP client.

As an example file, I have copied the Wikipedia entry found when typing in Livy. The text is actually about the roman historian Titus Livius.

I have moved to the AWS cloud for this example because it offers a convenient way to set up a cluster equipped with Livy, and files can easily be stored in S3 by an upload handler. Let’s now see, how we should proceed:

curl -X POST --data '{"file":"s3://livy-example/wordcount.py","args":[s3://livy-example/livy_life.txt"]}' \
-H "Content-Type: application/json" http://livy-server:8998/batches

The structure is quite similar to what we have seen before. By passing over the batch to Livy, we get an identifier in return along with some other information like the current state.

{"id":1,"name":null,"state":"running","appId":"application_1567416002081_0005",...}

To monitor the progress of the job, there is also a directive to call: /batches/{batch_id}/state

Most probably, we want to guarantee at first that the job ran successfully. In all other cases, we need to find out what has happened to our job. The directive /batches/{batchId}/log can be a help here to inspect the run.

Finally, the session is removed by:

curl -X DELETE http://livy-server:8998/batches/1 

which returns: {"msg":"deleted"} and we are done.

Trivia

  • AWS’ Hadoop cluster service EMR supports Livy natively as Software Configuration option.
aws-emr
  • Apache’s notebook tool Zeppelin supports Livy as an Interpreter, i.e. you write code within the notebook and execute it directly against Livy REST API without handling HTTP yourself.
  • Be cautious not to use Livy in every case when you want to query a Spark cluster: Namely, In case you want to use Spark as Query backend and access data via Spark SQL, rather check out Thriftserver instead of building around Livy.
  • Kerberos can be integrated into Livy for authentication purposes.
Über den Autor
Andre Münch

Andre Münch

I am a data engineer at STATWORX. I love to have challenges to setup data structure and compose components to integrate Data Science models into productive environments.

ABOUT US


STATWORX
is a consulting company for data science, statistics, machine learning and artificial intelligence located in Frankfurt, Zurich and Vienna. Sign up for our NEWSLETTER and receive reads and treats from the world of data science and AI. If you have questions or suggestions, please write us an e-mail addressed to blog(at)statworx.com.