UFO ET IT

재현 가능한 Apache Spark 예제를 만드는 방법

ufoet 2021. 1. 7. 08:21
반응형

재현 가능한 Apache Spark 예제를 만드는 방법


나는 태그에 대한 몇 가지 질문을 읽는 데 상당한 시간을 소비했으며 포스터가 질문을 진정으로 이해하기에 충분한 정보를 제공하지 않는 경우가 많습니다. 나는 보통 그들에게 MCVE 를 게시하라고 요청 하지만 때때로 샘플 입력 / 출력 데이터를 보여 주도록하는 것은 이빨을 당기는 것과 같습니다. 예 : 이 질문 에 대한 의견을 참조하십시오 .

아마도 문제의 일부는 사람들이 스파크 데이터 프레임 용 MCVE를 쉽게 만드는 방법을 모른다는 것입니다. 이 팬더 질문 의 스파크 데이터 프레임 버전을 링크 할 수있는 가이드로 사용하는 것이 유용 할 것이라고 생각합니다 .

그렇다면 재현 가능한 좋은 예를 만드는 방법은 무엇입니까?


쉽게 다시 만들 수있는 작은 샘플 데이터를 제공합니다.

최소한 포스터는 데이터 프레임과 코드에 두 개의 행과 열을 제공해야 쉽게 만들 수 있습니다. 간단히 말해서 잘라서 붙여 넣기를 의미합니다. 문제를 보여주기 위해 가능한 한 작게 만드십시오.


다음 데이터 프레임이 있습니다.

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

다음 코드로 만들 수 있습니다.

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

원하는 출력을 표시합니다.

구체적인 질문을하고 원하는 결과를 보여주세요.


어떻게 새 열 만들 수 있습니다 'is_divisible' 값을 가지고 'yes' 의 월의 일 경우 'date' 플러스 칠일이 열의 값으로 나누어'X' , 그리고 'no' 그렇지?

원하는 출력 :

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

출력을 얻는 방법을 설명하십시오.

원하는 결과를 얻는 방법을 자세히 설명하십시오. 계산 예를 보여주는 데 도움이됩니다.


예를 들어 1 행에서 X = 1이고 날짜 = 2017-01-01입니다. 현재까지 7 일을 더하면 2017-01-08이됩니다. 날짜는 8이고 8은 1로 나눌 수 있으므로 대답은 '예'입니다.

마찬가지로 마지막 행의 경우 X = 7이고 날짜 = 2017-01-04입니다. 날짜에 7을 더하면 해당 월의 날짜가 11이됩니다. 11 % 7은 0이 아니므로 대답은 '아니오'입니다.


기존 코드를 공유하십시오.

작동하지 않는 경우에도 모든 * 코드를 포함하여 수행했거나 시도한 작업을 보여주십시오 . 문제가 발생한 부분을 알려주고 오류가 발생하면 오류 메시지를 포함 해주세요.

(* 스파크 컨텍스트를 생성하기 위해 코드를 생략 할 수 있지만 모든 가져 오기를 포함해야합니다.)


7 일을 더한 새 열을 추가하는 방법을 알고 date 있지만 그 달의 날짜를 정수로 가져 오는 데 문제가 있습니다.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

버전 포함, 가져 오기 및 구문 강조 사용


성능 조정 게시물의 경우 실행 계획 포함

  • 이 답변의 전체 세부 사항은 user8371915에 의해 작성되었습니다 .
  • 컨텍스트에 대해 표준화 된 이름을 사용하는 데 도움이됩니다.

Spark 출력 파일 구문 분석

  • MaxU이 답변 에서 Spark 출력 파일을 DataFrame으로 구문 분석하는 데 도움 되는 유용한 코드를 제공 했습니다.

기타 참고 사항.


성능 조정

질문이 성능 조정과 관련된 경우 다음 정보를 포함하십시오.

실행 계획

확장 된 실행 계획 을 포함하는 것이 가장 좋습니다 . Python에서 :

df.explain(True) 

Scala에서 :

df.explain(true)

또는 통계가있는 확장 된 실행 계획 . Python에서 :

print(df._jdf.queryExecution().stringWithStats())

Scala에서 :

df.queryExecution.stringWithStats

모드 및 클러스터 정보

  • mode- local,, client`클러스터.
  • 클러스터 관리자 (해당하는 경우)-없음 (로컬 모드), 독립형, YARN, Mesos, Kubernetes.
  • 기본 구성 정보 (코어 수, 실행기 메모리).

타이밍 정보

특히 분산되지 않은 애플리케이션을 포팅하거나 낮은 지연 시간을 예상 할 때 느린 것이 상대적입니다. 다양한 작업 및 단계에 대한 정확한 타이밍은 Spark UI ( sc.uiWebUrl) jobs또는 Spark REST UI 에서 검색 할 수 있습니다 .

컨텍스트에 표준 이름 사용

각 컨텍스트에 대해 확립 된 이름을 사용하면 문제를 신속하게 재현 할 수 있습니다.

  • sc- SparkContext.
  • sqlContext- SQLContext.
  • spark- SparkSession.

유형 정보 제공 ( Scala )

강력한 유형 추론은 Scala의 가장 유용한 기능 중 하나이지만 컨텍스트에서 벗어난 코드를 분석하기 어렵습니다. 컨텍스트에서 유형이 분명하더라도 변수에 주석을 추가하는 것이 좋습니다. 취하다

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

위에

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

일반적으로 사용되는 도구는 다음을 지원할 수 있습니다.

  • spark-shell / 스칼라 쉘

    사용하다 :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
    
  • InteliJ 아이디어

    Alt+ 사용=


좋은 질문 및 답변; 몇 가지 추가 제안 :

Spark 버전 포함

Spark는 1.x 시대만큼 빠르게 발전하지는 않지만 여전히 진화하고 있습니다. 항상 (그러나 특히 다소 오래된 버전을 사용하는 경우) 작업 버전을 포함하는 것이 좋습니다. 개인적으로 저는 항상 다음과 같이 답변시작 합니다 .

spark.version
# u'2.2.0'

또는

sc.version
# u'2.2.0'

Including your Python version, too, is never a bad idea.


Include all your imports

If your question is not strictly about Spark SQL & dataframes, e.g. if you intend to use your dataframe in some machine learning operation, be explicit about your imports - see this question, where the imports were added in the OP only after extensive exchange in the (now removed) comments (and turned out that these wrong imports were the root cause of the problem).

Why is this necessary? Because, for example, this LDA

from pyspark.mllib.clustering import LDA

is different from this LDA:

from pyspark.ml.clustering import LDA

the first coming from the old, RDD-based API (formerly Spark MLlib), while the second one from the new, dataframe-based API (Spark ML).


Include code highlighting

OK, I'll confess this is subjective: I believe that PySpark questions should not be tagged as python by default; the thing is, python tag gives automatically code highlighting (and I believe this is a main reason for those who use it for PySpark questions). Anyway, if you happen to agree, and you still would like a nice, highlighted code, simply include the relevant markdown directive:

<!-- language-all: lang-python -->

somewhere in your post, before your first code snippet.

[UPDATE: I have requested automatic syntax highlighting for pyspark and sparkr tags - upvotes most welcome]


This small helper function might help to parse Spark output files into DataFrame:

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

Usage:

df = read_spark_output("file:///tmp/spark.out")

PS: For pyspark, eqNullSafe is available from spark 2.3.

참조 URL : https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples

반응형