재현 가능한 Apache Spark 예제를 만드는 방법
나는 pyspark 및 spark- dataframe 태그에 대한 몇 가지 질문을 읽는 데 상당한 시간을 소비했으며 포스터가 질문을 진정으로 이해하기에 충분한 정보를 제공하지 않는 경우가 많습니다. 나는 보통 그들에게 MCVE 를 게시하라고 요청 하지만 때때로 샘플 입력 / 출력 데이터를 보여 주도록하는 것은 이빨을 당기는 것과 같습니다. 예 : 이 질문 에 대한 의견을 참조하십시오 .
아마도 문제의 일부는 사람들이 스파크 데이터 프레임 용 MCVE를 쉽게 만드는 방법을 모른다는 것입니다. 이 팬더 질문 의 스파크 데이터 프레임 버전을 링크 할 수있는 가이드로 사용하는 것이 유용 할 것이라고 생각합니다 .
그렇다면 재현 가능한 좋은 예를 만드는 방법은 무엇입니까?
쉽게 다시 만들 수있는 작은 샘플 데이터를 제공합니다.
최소한 포스터는 데이터 프레임과 코드에 두 개의 행과 열을 제공해야 쉽게 만들 수 있습니다. 간단히 말해서 잘라서 붙여 넣기를 의미합니다. 문제를 보여주기 위해 가능한 한 작게 만드십시오.
다음 데이터 프레임이 있습니다.
+-----+---+-----+----------+
|index| X|label| date|
+-----+---+-----+----------+
| 1| 1| A|2017-01-01|
| 2| 3| B|2017-01-02|
| 3| 5| A|2017-01-03|
| 4| 7| B|2017-01-04|
+-----+---+-----+----------+
다음 코드로 만들 수 있습니다.
df = sqlCtx.createDataFrame(
[
(1, 1, 'A', '2017-01-01'),
(2, 3, 'B', '2017-01-02'),
(3, 5, 'A', '2017-01-03'),
(4, 7, 'B', '2017-01-04')
],
('index', 'X', 'label', 'date')
)
원하는 출력을 표시합니다.
구체적인 질문을하고 원하는 결과를 보여주세요.
어떻게 새 열 만들 수 있습니다 'is_divisible'
값을 가지고 'yes'
의 월의 일 경우 'date'
플러스 칠일이 열의 값으로 나누어'X'
, 그리고 'no'
그렇지?
원하는 출력 :
+-----+---+-----+----------+------------+
|index| X|label| date|is_divisible|
+-----+---+-----+----------+------------+
| 1| 1| A|2017-01-01| yes|
| 2| 3| B|2017-01-02| yes|
| 3| 5| A|2017-01-03| yes|
| 4| 7| B|2017-01-04| no|
+-----+---+-----+----------+------------+
출력을 얻는 방법을 설명하십시오.
원하는 결과를 얻는 방법을 자세히 설명하십시오. 계산 예를 보여주는 데 도움이됩니다.
예를 들어 1 행에서 X = 1이고 날짜 = 2017-01-01입니다. 현재까지 7 일을 더하면 2017-01-08이됩니다. 날짜는 8이고 8은 1로 나눌 수 있으므로 대답은 '예'입니다.
마찬가지로 마지막 행의 경우 X = 7이고 날짜 = 2017-01-04입니다. 날짜에 7을 더하면 해당 월의 날짜가 11이됩니다. 11 % 7은 0이 아니므로 대답은 '아니오'입니다.
기존 코드를 공유하십시오.
작동하지 않는 경우에도 모든 * 코드를 포함하여 수행했거나 시도한 작업을 보여주십시오 . 문제가 발생한 부분을 알려주고 오류가 발생하면 오류 메시지를 포함 해주세요.
(* 스파크 컨텍스트를 생성하기 위해 코드를 생략 할 수 있지만 모든 가져 오기를 포함해야합니다.)
7 일을 더한 새 열을 추가하는 방법을 알고 date
있지만 그 달의 날짜를 정수로 가져 오는 데 문제가 있습니다.
from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))
버전 포함, 가져 오기 및 구문 강조 사용
- 이 답변의 자세한 내용은 desertnaut가 작성했습니다 .
성능 조정 게시물의 경우 실행 계획 포함
- 이 답변의 전체 세부 사항은 user8371915에 의해 작성되었습니다 .
- 컨텍스트에 대해 표준화 된 이름을 사용하는 데 도움이됩니다.
Spark 출력 파일 구문 분석
기타 참고 사항.
- 요청 방법 과 최소, 완전 및 검증 가능한 예제를 만드는 방법을 먼저 읽어 보십시오 .
- 위에 링크 된이 질문에 대한 다른 답변을 읽으십시오.
- 훌륭하고 설명적인 제목이 있어야합니다.
- 공손. SO의 사람들은 자원 봉사자이므로 친절하게 물어보십시오.
성능 조정
질문이 성능 조정과 관련된 경우 다음 정보를 포함하십시오.
실행 계획
확장 된 실행 계획 을 포함하는 것이 가장 좋습니다 . Python에서 :
df.explain(True)
Scala에서 :
df.explain(true)
또는 통계가있는 확장 된 실행 계획 . Python에서 :
print(df._jdf.queryExecution().stringWithStats())
Scala에서 :
df.queryExecution.stringWithStats
모드 및 클러스터 정보
mode
-local
,,client
`클러스터.- 클러스터 관리자 (해당하는 경우)-없음 (로컬 모드), 독립형, YARN, Mesos, Kubernetes.
- 기본 구성 정보 (코어 수, 실행기 메모리).
타이밍 정보
특히 분산되지 않은 애플리케이션을 포팅하거나 낮은 지연 시간을 예상 할 때 느린 것이 상대적입니다. 다양한 작업 및 단계에 대한 정확한 타이밍은 Spark UI ( sc.uiWebUrl
) jobs
또는 Spark REST UI 에서 검색 할 수 있습니다 .
컨텍스트에 표준 이름 사용
각 컨텍스트에 대해 확립 된 이름을 사용하면 문제를 신속하게 재현 할 수 있습니다.
sc
-SparkContext
.sqlContext
-SQLContext
.spark
-SparkSession
.
유형 정보 제공 ( Scala )
강력한 유형 추론은 Scala의 가장 유용한 기능 중 하나이지만 컨텍스트에서 벗어난 코드를 분석하기 어렵습니다. 컨텍스트에서 유형이 분명하더라도 변수에 주석을 추가하는 것이 좋습니다. 취하다
val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))
위에
val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))
일반적으로 사용되는 도구는 다음을 지원할 수 있습니다.
spark-shell
/ 스칼라 쉘사용하다
:t
scala> val rdd = sc.textFile("README.md") rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> :t rdd org.apache.spark.rdd.RDD[String]
InteliJ 아이디어
Alt+ 사용=
좋은 질문 및 답변; 몇 가지 추가 제안 :
Spark 버전 포함
Spark는 1.x 시대만큼 빠르게 발전하지는 않지만 여전히 진화하고 있습니다. 항상 (그러나 특히 다소 오래된 버전을 사용하는 경우) 작업 버전을 포함하는 것이 좋습니다. 개인적으로 저는 항상 다음과 같이 답변 을 시작 합니다 .
spark.version
# u'2.2.0'
또는
sc.version
# u'2.2.0'
Including your Python version, too, is never a bad idea.
Include all your imports
If your question is not strictly about Spark SQL & dataframes, e.g. if you intend to use your dataframe in some machine learning operation, be explicit about your imports - see this question, where the imports were added in the OP only after extensive exchange in the (now removed) comments (and turned out that these wrong imports were the root cause of the problem).
Why is this necessary? Because, for example, this LDA
from pyspark.mllib.clustering import LDA
is different from this LDA:
from pyspark.ml.clustering import LDA
the first coming from the old, RDD-based API (formerly Spark MLlib), while the second one from the new, dataframe-based API (Spark ML).
Include code highlighting
OK, I'll confess this is subjective: I believe that PySpark questions should not be tagged as python
by default; the thing is, python
tag gives automatically code highlighting (and I believe this is a main reason for those who use it for PySpark questions). Anyway, if you happen to agree, and you still would like a nice, highlighted code, simply include the relevant markdown directive:
<!-- language-all: lang-python -->
somewhere in your post, before your first code snippet.
[UPDATE: I have requested automatic syntax highlighting for pyspark
and sparkr
tags - upvotes most welcome]
This small helper function might help to parse Spark output files into DataFrame:
PySpark:
from pyspark.sql.functions import *
def read_spark_output(file_path):
step1 = spark.read \
.option("header","true") \
.option("inferSchema","true") \
.option("delimiter","|") \
.option("parserLib","UNIVOCITY") \
.option("ignoreLeadingWhiteSpace","true") \
.option("ignoreTrailingWhiteSpace","true") \
.option("comment","+") \
.csv("file://{}".format(file_path))
# select not-null columns
step2 = t.select([c for c in t.columns if not c.startswith("_")])
# deal with 'null' string in column
return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])
Scala:
// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
val step1 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "|")
.option("parserLib", "UNIVOCITY")
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("comment", "+")
.csv(filePath)
val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)
val columns = step2.columns
columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}
Usage:
df = read_spark_output("file:///tmp/spark.out")
PS: For pyspark, eqNullSafe
is available from spark 2.3
.
참조 URL : https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples
'UFO ET IT' 카테고리의 다른 글
멀티 인덱스를 사용한 판다 플로팅 (0) | 2021.01.07 |
---|---|
sinon stub withArgs가 일부 인수와 일치하지만 모든 인수와 일치 할 수 있습니까? (0) | 2021.01.07 |
Python Tkinter 루트 창을 어떻게 제거합니까? (0) | 2021.01.07 |
VIM에서 특정 탭으로 전환 (0) | 2021.01.07 |
Beyond Compare에서 Delphi Form Files 버전을 비교하는 동안 특정 차이점을 무시하도록하려면 어떻게해야합니까? (0) | 2021.01.07 |