Questions tagged [apache-spark]

Apache Spark is an open source distributed data processing engine written in Scala providing a unified API and distributed data sets to users for both batch and streaming processing. Use cases for Apache Spark often are related to machine/deep learning and graph processing.

Filter by
Sorted by
Tagged with
422 votes
20 answers
377k views

Spark - repartition() vs coalesce()

According to Learning Spark Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding ...
Praveen Sripati's user avatar
346 votes
14 answers
177k views

Difference between DataFrame, Dataset, and RDD in Spark

I'm just wondering what is the difference between an RDD and DataFrame (Spark 2.0.0 DataFrame is a mere type alias for Dataset[Row]) in Apache Spark? Can you convert one to the other?
oikonomiyaki's user avatar
  • 7,911
337 votes
26 answers
641k views

How to change dataframe column names in PySpark?

I come from pandas background and am used to reading data from CSV files into a dataframe and then simply changing the column names to something useful using the simple command: df.columns = ...
Shubhanshu Mishra's user avatar
323 votes
17 answers
472k views

How to show full column content in a Spark Dataframe?

I am using spark-csv to load data into a DataFrame. I want to do a simple query and display the content: val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("my....
tracer's user avatar
  • 3,335
307 votes
17 answers
244k views

What is the difference between map and flatMap and a good use case for each?

Can someone explain to me the difference between map and flatMap and what is a good use case for each? What does "flatten the results" mean? What is it good for?
Eran Witkon's user avatar
  • 4,072
300 votes
2 answers
148k views

What are workers, executors, cores in Spark Standalone cluster?

I read Cluster Mode Overview and I still can't understand the different processes in the Spark Standalone cluster and the parallelism. Is the worker a JVM process or not? I ran the bin\start-slave.sh ...
Manikandan Kannan's user avatar
292 votes
14 answers
436k views

Spark java.lang.OutOfMemoryError: Java heap space

My cluster: 1 master, 11 slaves, each node has 6 GB memory. My settings: spark.executor.memory=4g, Dspark.akka.frameSize=512 Here is the problem: First, I read some data (2.19 GB) from HDFS to RDD:...
Hellen's user avatar
  • 3,502
256 votes
11 answers
251k views

Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Getting strange behavior when calling function outside of a closure: when function is in a object everything is working when function is in a class get : Task not serializable: java.io....
Nimrod007's user avatar
  • 9,885
253 votes
9 answers
185k views

Apache Spark: The number of cores vs. the number of executors

I'm trying to understand the relationship of the number of cores and the number of executors when running a Spark job on YARN. The test environment is as follows: Number of data nodes: 3 Data node ...
zeodtr's user avatar
  • 11k
244 votes
7 answers
168k views

What is the difference between cache and persist?

In terms of RDD persistence, what are the differences between cache() and persist() in spark ?
user avatar
226 votes
22 answers
269k views

How to stop INFO messages displaying on spark console?

I'd like to stop various messages that are coming on spark shell. I tried to edit the log4j.properties file in order to stop these message. Here are the contents of log4j.properties # Define the ...
Vishwas's user avatar
  • 7,027
219 votes
1 answer
63k views

Spark performance for Scala vs Python

I prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons. With that assumption, I thought ...
Mrityunjay's user avatar
  • 2,251
217 votes
15 answers
573k views

Show distinct column values in pyspark dataframe

With pyspark dataframe, how do you do the equivalent of Pandas df['col'].unique(). I want to list out all the unique values in a pyspark dataframe column. Not the SQL type way (registertemplate then ...
Satya's user avatar
  • 5,725
215 votes
7 answers
217k views

Add JAR files to a Spark job - spark-submit

True... it has been discussed quite a lot. However, there is a lot of ambiguity and some of the answers provided ... including duplicating JAR references in the jars/executor/driver configuration or ...
YoYo's user avatar
  • 9,277
208 votes
4 answers
340k views

How to add a constant column in a Spark DataFrame?

I want to add a column in a DataFrame with some arbitrary value (that is the same for each row). I get an error when I use withColumn as follows: dt.withColumn('new_column', 10).head(5) -------------...
Evan Zamir's user avatar
  • 8,289
197 votes
10 answers
179k views

How to select the first row of each group?

I have a DataFrame generated as follow: df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc)) The results look like: +----+--------+-------...
Rami's user avatar
  • 8,204
192 votes
5 answers
101k views

(Why) do we need to call cache or persist on a RDD

When a resilient distributed dataset (RDD) is created from a text file or collection (or from another RDD), do we need to call "cache" or "persist" explicitly to store the RDD data into memory? Or is ...
user avatar
187 votes
10 answers
173k views

How to read multiple text files into a single RDD?

I want to read a bunch of text files from a hdfs location and perform mapping on it in an iteration using spark. JavaRDD<String> records = ctx.textFile(args[1], 1); is capable of reading only ...
user3705662's user avatar
  • 2,167
185 votes
17 answers
194k views

How to turn off INFO logging in Spark?

I installed Spark using the AWS EC2 guide and I can launch the program fine using the bin/pyspark script to get to the spark prompt and can also do the Quick Start quide successfully. However, I ...
horatio1701d's user avatar
  • 9,019
185 votes
18 answers
510k views

Concatenate columns in Apache Spark DataFrame

How do we concatenate two columns in an Apache Spark DataFrame? Is there any function in Spark SQL which we can use?
Nipun's user avatar
  • 4,229
184 votes
11 answers
512k views

How do I add a new column to a Spark DataFrame (using PySpark)?

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column. I've tried the following without any success: type(randomed_hours) # => list # Create in Python and transform ...
Boris's user avatar
  • 2,035
183 votes
11 answers
417k views

Convert spark DataFrame column to python list

I work on a dataframe with two column, mvv and count. +---+-----+ |mvv|count| +---+-----+ | 1 | 5 | | 2 | 9 | | 3 | 3 | | 4 | 1 | i would like to obtain two list containing mvv values and ...
a.moussa's user avatar
  • 3,137
183 votes
23 answers
582k views

How can I change column types in Spark SQL's DataFrame?

Suppose I'm doing something like: val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) df.printSchema() root |-- year: string (nullable = true) ...
kevinykuo's user avatar
  • 4,700
181 votes
6 answers
418k views

How to sort by column in descending order in Spark SQL?

I tried df.orderBy("col1").show(10) but it sorted in ascending order. df.sort("col1").show(10) also sorts in ascending order. I looked on stackoverflow and the answers I found were ...
Vedom's user avatar
  • 3,077
180 votes
12 answers
501k views

Filter Pyspark dataframe column with None value

I'm trying to filter a PySpark dataframe that has None as a row value: df.select('dt_mvmt').distinct().collect() [Row(dt_mvmt=u'2016-03-27'), Row(dt_mvmt=u'2016-03-28'), Row(dt_mvmt=u'2016-03-29'),...
Ivan's user avatar
  • 20k
180 votes
3 answers
82k views

How are stages split into tasks in Spark?

Let's assume for the following that only one Spark job is running at every point in time. What I get so far Here is what I understand what happens in Spark: When a SparkContext is created, each ...
Make42's user avatar
  • 12.7k
171 votes
18 answers
217k views

How to check if spark dataframe is empty?

Right now, I have to use df.count > 0 to check if the DataFrame is empty or not. But it is kind of inefficient. Is there any better way to do that? PS: I want to check if it's empty so that I only ...
auxdx's user avatar
  • 2,433
171 votes
16 answers
429k views

Write single CSV file using spark-csv

I am using https://github.com/databricks/spark-csv , I am trying to write a single CSV, but not able to, it is making a folder. Need a Scala function which will take parameter like path and file name ...
user1735076's user avatar
  • 3,255
167 votes
9 answers
100k views

How to store custom objects in Dataset?

According to Introducing Spark Datasets: As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically: ... Custom encoders – while we currently autogenerate ...
167 votes
13 answers
271k views

How to set Apache Spark Executor memory

How can I increase the memory available for Apache spark executor nodes? I have a 2 GB file that is suitable to loading in to Apache Spark. I am running apache spark for the moment on 1 machine, so ...
WillamS's user avatar
  • 2,497
166 votes
4 answers
167k views

Apache Spark: map vs mapPartitions?

What's the difference between an RDD's map and mapPartitions method? And does flatMap behave like map or like mapPartitions? Thanks. (edit) i.e. what is the difference (either semantically or in ...
Nicholas White's user avatar
165 votes
14 answers
536k views

Spark - load CSV file as DataFrame?

I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name") I have tried: scala> val df = sqlContext.load("hdfs:///csv/file/dir/...
Donbeo's user avatar
  • 17.4k
164 votes
9 answers
396k views

How to delete columns in pyspark dataframe

>>> a DataFrame[id: bigint, julian_date: string, user_id: bigint] >>> b DataFrame[id: bigint, quan_created_money: decimal(10,0), quan_created_cnt: bigint] >>> a.join(b, a.id=...
xjx0524's user avatar
  • 1,651
159 votes
7 answers
419k views

How to change a dataframe column from String type to Double type in PySpark?

I have a dataframe with column as String. I wanted to change the column type to Double type in PySpark. Following is the way, I did: toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType()) ...
Abhishek Choudhary's user avatar
156 votes
12 answers
367k views

How to convert rdd object to dataframe in spark

How can I convert an RDD (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) to a Dataframe org.apache.spark.sql.DataFrame. I converted a dataframe to rdd using .rdd. After processing it I want it ...
user568109's user avatar
  • 47.6k
151 votes
13 answers
337k views

Spark Dataframe distinguish columns with duplicated name

So as I know in Spark Dataframe, that for multiple columns can have the same name as shown in below dataframe snapshot: [ Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=...
resec's user avatar
  • 2,151
146 votes
8 answers
417k views

Sort in descending order in PySpark

I'm using PySpark (Python 2.7.9/Spark 1.3.1) and have a dataframe GroupObject which I need to filter & sort in the descending order. Trying to achieve it via this piece of code. group_by_dataframe....
rclakmal's user avatar
  • 1,942
145 votes
5 answers
204k views

How to define partitioning of DataFrame?

I've started using Spark SQL and DataFrames in Spark 1.4.0. I'm wanting to define a custom partitioner on DataFrames, in Scala, but not seeing how to do this. One of the data tables I'm working with ...
rake's user avatar
  • 2,368
138 votes
13 answers
415k views

Best way to get the max value in a Spark dataframe column

I'm trying to figure out the best way to get the largest value in a Spark dataframe column. Consider the following example: df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) df....
xenocyon's user avatar
  • 2,478
137 votes
10 answers
322k views

How to print the contents of RDD?

I'm attempting to print the contents of a collection to the Spark console. I have a type: linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3] And I use the command: scala> ...
blue-sky's user avatar
  • 53k
137 votes
9 answers
334k views

How to overwrite the output directory in spark

I have a spark streaming application which produces a dataset for every minute. I need to save/overwrite the results of the processed data. When I tryto overwrite the dataset , the exceptionorg.apache....
Vijay Innamuri's user avatar
136 votes
5 answers
300k views

How to kill a running Spark application?

I have a running Spark application where it occupies all the cores where my other applications won't be allocated any resource. I did some quick research and people suggested using YARN kill or /bin/...
B.Mr.W.'s user avatar
  • 19.3k
134 votes
20 answers
221k views

importing pyspark in python shell

This is a copy of someone else's question on another forum that was never answered, so I thought I'd re-ask it here, as I have the same issue. (See http://geekple.com/blogs/feeds/Xgzu7/posts/...
Glenn Strycker's user avatar
134 votes
14 answers
417k views

Concatenate two PySpark dataframes

I'm trying to concatenate two PySpark dataframes with some columns that are only on one of them: from pyspark.sql.functions import randn, rand df_1 = sqlContext.range(0, 10) +--+ |id| +--+ | 0| | 1| ...
Ivan's user avatar
  • 20k
134 votes
6 answers
462k views

Convert pyspark string to date format

I have a date pyspark dataframe with a string column in the format of MM-dd-yyyy and I am attempting to convert this into a date column. I tried: df.select(to_date(df.STRING_COLUMN).alias('new_date'))....
Jenks's user avatar
  • 2,000
133 votes
42 answers
390k views

Pyspark: Exception: Java gateway process exited before sending the driver its port number

I'm trying to run pyspark on my macbook air. When i try starting it up I get the error: Exception: Java gateway process exited before sending the driver its port number when sc = SparkContext() is ...
mt88's user avatar
  • 2,935
133 votes
4 answers
205k views

What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism?

What's the difference between spark.sql.shuffle.partitions and spark.default.parallelism? I have tried to set both of them in SparkSQL, but the task number of the second stage is always 200.
Edison's user avatar
  • 1,335
130 votes
16 answers
154k views

Spark - Error "A master URL must be set in your configuration" when submitting an app

I have an Spark app which runs with no problem in local mode,but have some problems when submitting to the Spark cluster. The error msg are as follows: 16/06/24 15:42:06 WARN scheduler....
Shuai Zhang's user avatar
  • 2,041
128 votes
11 answers
78k views

Can Apache Spark run without Hadoop?

Are there any dependencies between Spark and Hadoop? If not, are there any features I'll miss when I run Spark without Hadoop?
tourist's user avatar
  • 4,285
127 votes
13 answers
422k views

Load CSV file with PySpark

I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing : sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect() ...
Kernael's user avatar
  • 3,280

1
2 3 4 5
1646