CS代考 pyspark code cheat sheet Week 1 word count example

pyspark code cheat sheet Week 1 word count example
from pyspark import SparkConf, SparkContex
from pyspark.sql import SparkSession
spark_conf = SparkConf()\
.setMaster(‘local[*]’)\
.setAppName(‘Assignment_1v2’)
spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
sc = spark.sparkContext
twitter_rdd = sc.textFile(‘twitter.txt’)
counts = twitter_rdd.flatMap(lambda line: line.split(” “)) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.collect()
Week 2 parallel search
bank_rdd1 = bank_rdd.map(lambda line: line.split(‘,’))
header = bank_rdd1.first()
bank_rdd1 = bank_rdd1.filter(lambda row: row != header) #filter out header
bank_rdd1 = bank_rdd1.filter(lambda x: int(x[5])>1000 and int(x[5])<2000) numPartitions = bank_rdd1.getNumPartitions() print(f"Total partitions: {numPartitions}") # glom(): Return an RDD created by coalescing all elements within each partition into a list partitions = bank_rdd1.glom().collect() for index,partition in enumerate(partitions): print(f'------ Partition {index}:') for record in partition: print(record) result_max_balance = bank_rdd_4.max(key=lambda x: int(x[5])) # Round-robin data partitioning df_round = df.repartition(5) # Range data partitioning df_range = df.repartitionByRange(5,"balance") # Hash data partitioning column_hash = "education" df_hash = df.repartition(column_hash) Week 3 parallel join from pyspark.sql.functions import broadcast # Use broadcast function to specify the use of BroadcastHashJoin algorithm df_joined_broadcast = df_B.join(broadcast(df_A),df_A.id==df_B.id,how='inner') df_dict_inner_summ = df_dictionary.join(df_summer,df_dictionary.Code==df_summer.Country,how='inner') Week 4 parallel aggregation import pyspark.sql.functions as F #### Aggregate the dataset by 'Year' and count the total number of athletes using Dataframe agg_attribute = 'Year' df_count = df_events.groupby(agg_attribute).agg(F.count(agg_attribute).alias('Total')) #### Aggregate the dataset by 'Year' and count the total number of athletes sql_count = spark.sql(''' SELECT year,count(*) FROM sql_events GROUP BY year df = df.withColumn('balance', col('balance').cast('integer')) df = df.withColumn('Age',F.col('Age').cast(IntegerType())) from pyspark.sql.types import IntegerType df = df.withColumn('Height',df.Height.cast(IntegerType())) df.filter((df.Season == 'Winter')) \ .groupby('Country') \ .agg(F.min('Height').alias('min_height'), \ F.avg('Height').alias('avg_height'), \ F.max('Height').alias('max_height')) \ .sort('avg_height', ascending=False) \ Week 5 spark ml from pyspark.ml.feature import StringIndexer df_ref = spark.createDataFrame( [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], ["id", "category"]) indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") indexed_transformer = indexer.fit(df_ref) indexed = indexed_transformer.transform(df_ref) indexed.show() from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.classification import GBTClassifier dc = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'attack', maxDepth = 10) gbt = GBTClassifier(featuresCol = 'features', labelCol = 'attack', maxIter = def connect_kafka_producer(): _producer = None _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], dumps(x).encode('ascii'), value_serializer=lambda x: api_version=(0, 10)) except Exception as ex: print('Exception while connecting Kafka.') print(str(ex)) return _producer def connect_kafka_consumer(): _consumer = None _consumer = KafkaConsumer(topic, consumer_timeout_ms=10000, # stop iteration if no message after 10 sec auto_offset_reset='latest', # comment this if you don't want to consume earliest bootstrap_servers=['localhost:9092'], loads(x.decode('ascii')), value_deserializer=lambda x: api_version=(0, 10)) except Exception as ex: print('Exception while connecting Kafka') print(str(ex)) return _consumer os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark- streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0- 10_2.12:3.0.0 pyspark-shell' from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql import functions as F from pyspark.sql.types import * spark = SparkSession \ .builder \ .appName("Clickstream Analysis in Spark") \ .getOrCreate() topic = "clickstream" df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "127.0.0.1:9092") \ .option("subscribe", topic) \ query_file_sink_p = df_process \ .writeStream.format('parquet')\ .outputMode('append')\ .option('path', 'process.parquet') \ .option('checkpointLocation', 'parquet/linux_process_log/checkpoint')\ attack_count_dfm = predictions_dfm \ .filter('prediction = 1') \ .groupBy('machine', window(predictions_dfm.event_time, '2 minutes')) \ .agg(approx_count_distinct('CMD_PID').alias('count')) \ .select('machine','window','count') \ .orderBy('machine','window') query = df \ .writeStream \ .outputMode("append") \ .format("console") \ query_p = attack_count_dfp \ .writeStream \ .queryName("process_attack_count") \ .outputMode("Complete") \ .format("memory") \ .trigger(processingTime='10 seconds') \