Trending December 2023 # Working And Examples Of Pyspark Collect # Suggested January 2024 # Top 16 Popular

You are reading the article Working And Examples Of Pyspark Collect updated in December 2023 on the website Cattuongwedding.com. We hope that the information we have shared is helpful to you. If you find the content interesting and meaningful, please share it with your friends and continue to follow and support us for the latest updates. Suggested January 2024 Working And Examples Of Pyspark Collect

Introduction to PySpark collect

PYSPARK COLLECT is an action in PySpark that is used to retrieve all the elements from the nodes of the Data Frame to the driver node. It is an operation that is used to fetch data from RDD/ Data Frame. The operation involves data that fetches the data and gets it back to the driver node.

Start Your Free Software Development Course

Web development, programming languages, Software testing & others

The collect operation returns the data as an Array of Row Types to the driver; the result is collected and further displayed for PySpark operation. The data, once is available on the node, can be used in the loops and displayed. The collect operation is widely used for smaller Data Set the data which can be fit upon memory or post that can cause some certain memory exception too. Let’s check the Collect operation in detail and try to understand the functionality for the same.

The syntax for the COLLECT function is:-

cd = spark.sparkContext.parallelize(data1) cd.collect()

explanation:

Cd:- The RDD made from the Data

.collect () :- The function used for Collecting the RDD.

Screenshot:

Working of Collect in Pyspark

Let us see somehow the COLLECT operation works in PySpark:-

Collect is an action that returns all the elements of the dataset, RDD of a PySpark, to the driver program. It is basically used to collect the data from the various node to the driver program that is further returned to the user for analysis.

Retrieving the huge data set can sometimes cause an out-of-memory issue over data collection.

This is a network movement action call where all the elements from the different nodes are sent to the driver memory where the data is collected, so the data movement is much over the collect operation. Since it is an action call of PySpark so every time it is called, all the transformations are done prior to implementing its action.

It retrieves the element in the form of Array [Row] to the driver program.

Let’s check the creation and usage with some coding examples.

Example of PySpark collect

Let us see some Example of how the PYSPARK  COLLECT operation works:-

Let’s start by creating simple data in PySpark.

data1  = [{'Name':'Jhon','ID':2,'Add':'USA'},{'Name':'Joe','ID':3,'Add':'USA'},{'Name':'Tina','ID':2,'Add':'IND'},{'Name':'Jhon','ID':2, 'Add':'USA'},{'Name':'Joe','ID':5,'Add':'INA'}]

A sample data is created with Name, ID, and ADD as the field.

a = sc.parallelize(data1)

RDD is created using sc. parallelize.

b = spark.createDataFrame(a) b.show()

Screenshot:

Now let us try to collect the elements from the RDD.

a=sc.parallelize(data1) a.collect()

This collects all the data back to the driver node, and the result is then displayed as a result at the console.

Screenshot:

a.collect()[0] a.collect()[1] a.collect()[2]

The above code shows that we can also select a selected number of the column from an RDD/Data Frame using collect with index. The index is used to retrieve elements from it.

Screenshot:

Let’s try to understand this with more Example:-

data3 = sc.parallelize(data2) data2 = [1,2,3,4,5,6,7,8,9,10] data3 = sc.parallelize(data2) data3.collect()

This is a very simple way to understand more about collect where we have made a simple RDD of type Int. Post collecting, we can get the data back to driver memory as a result. All the data Frames are called back to the driver, and the result is displayed back. Once the data is available, we can use the data back for our purpose, data analysis and data modeling.

Screenshot:-

These are some of the Examples of PYSPARK ROW Function in PySpark.

Note:-

COLLECT is an action in PySpark.

COLLECT collects the data back to the driver node.

PySpark COLLECT returns the type as Array[Row].

COLLECT can return data back to memory so that excess data collection can cause Memory issues.

PySpark COLLECT causes the movement of data over the network and brings it back to the driver memory.

COLLECTASLIST() is used to collect the same but the result as List.

Conclusion

From the above article, we saw the use of collect Operation in PySpark. We tried to understand how the COLLECT method works in PySpark and what is used at the programming level from various examples and classification.

Recommended Articles

This is a guide to the PySpark collect. Here we discuss the use of collect Operation in PySpark with various examples and classification. You may also have a look at the following articles to learn more –

You're reading Working And Examples Of Pyspark Collect

Working And Examples Of Numpy Norm

Introduction to NumPy norm

The error of a given model in machine learning and deep learning can be evaluated by using a function called norm which can be thought of as the length of the vector to map the vector to a given positive value, and the length of the vector can be calculated using three vector norms namely vector L1 norm, vector L2 norm and vector max norm where vector L1 norm represents the L1 norm of the vector which calculates the absolute vector values sum and vector L2 norm represents the L2 norm of the vector which calculates the squared vectored values sum and finds its square root and vector max norm calculates the vector’s maximum value. In this topic, we are going to learn about the NumPy norm.

Start Your Free Software Development Course

Web development, programming languages, Software testing & others

Syntax

The syntax for NumPy norm in Python is as follows:

and the syntax for the same is as follows:

norm(arrayname, normorder=1);

where arrayname is the name of the array whose L1 norm of the vector must be calculated and

normorder specifies the norm order of the vector, which is 1 for the L1 norm of a vector.

2. norm() function is used to calculate the L2 norm of the vector in NumPy using the formula:

and the syntax for the same is as follows:

norm(arrayname);

where array name is the name of the array whose L2 norm of the vector must be calculated.

3. norm() function is used to calculate the maximum value of the vector in NumPy using the formula:

and the syntax for the same is as follows:

norm(arrayname, inf);

where array name is the name of the array whose L2 norm of the vector must be calculated and inf represents infinity.

Working of NumPy norm

The error of a given model in machine learning and deep learning can be evaluated by using a function called norm which can be thought of as the length of the vector to map the vector to a given positive value.

The length of the vector can be calculated using three vector norms, namely vector L1 norm, vector L2 norm, and vector max norm,

The Vector L1 norm represents the L1 norm of the vector, which calculates the absolute vector values sum.

The Vector L2 norm represents the L2 norm of the vector, which calculates the squared vectored values sum and finds its square root.

The vector max norm is used to calculate the vector’s maximum value.

Examples of NumPy norm

Given below are the examples of NumPy norm:

Example #1

Python program to demonstrate NumPynorm function to calculate the L1 norm of the vector of the newly created array:

Code:

#importing the package numpy and importing the package for norm import numpy as nump from numpy.linalg import norm #Creating an array by making use of array function in NumPy and storing it in a variable called nameofthearray nameofthearray = nump.array([1,2,3,4]) #Displaying the elements of nameofthearray followed by one line space by making use of n print 'The elements of the given array are:' print nameofthearray print 'n' #using norm function of NumPy and passing the created array as the parameter to that function along with 1 to specify the order of norm to find the L1 norm of vector value and store it in a variable called L1norm L1norm = norm(nameofthearray,1) #Displaying the L1 norm of vector value stored in L1norm variable print 'The L1 norm of vector value is:' print L1norm

Output:

The package for NumPy is imported in the above program, and the package for using norm is imported. Then an array is created using the array function in NumPy, and it is stored in the variable called the name of the array. Then the elements of the array name of the array are displayed. Then the norm() function in NumPy is used to find the L1 norm of a vector bypassing the name of the array and the order of the norm, which is 1 as the parameter to the norm() function, and the result returned is stored in a variable called L1norm which is printed as the output on the screen. Finally, the output is shown in the snapshot above.

Example #2

Python program to demonstrate NumPy norm function to calculate the L2 norm of the vector of the newly created array:

#importing the package numpy and importing the package for norm import numpy as nump from numpy.linalg import norm #Creating an array by making use of array function in NumPy and storing it in a variable called nameofthearray nameofthearray = nump.array([1,2,3,4]) #Displaying the elements of nameofthearray followed by one line space by making use of n print 'The elements of the given array are:' print nameofthearray print 'n' #using norm function of NumPy and passing the created array as the parameter to that function to find the L2 norm of vector value and store it in a variable called L1norm L2norm = norm(nameofthearray) #Displaying the L2 norm of vector value stored in L2norm variable print 'The L2 norm of vector value is:' print L2norm

Output:

The package for NumPy is imported in the above program, and the package for using norm is imported. Then an array is created using the array function in NumPy, and it is stored in the variable called nameofthearray. Then the elements of the array nameofthearray are displayed. Then the norm() function in NumPy is used to find the L2 norm of the vector bypassing the nameofthearray array as the parameter to the norm() function, and the result returned is stored in a variable called L2norm, which is printed as the output on the screen. The output is shown in the snapshot above.

Example #3

Python program to demonstrate NumPy norm function to calculate the maximum value of the vector of the newly created array:

Code:

#importing the package numpy and importing the package for norm import numpy as nump from numpy.linalg import norm from numpy import inf #Creating an array by making use of array function in NumPy and storing it in a variable called nameofthearray nameofthearray = nump.array([1,2,3,4]) #Displaying the elements of nameofthearray followed by one line space by making use of n print 'The elements of the given array are:' print nameofthearray print 'n' #using norm function of NumPy and passing the created array as the parameter to that function to find the maximum value of vector and store it in a variable called mnorm mnorm = norm(nameofthearray,inf) #Displaying the maximum value of vector value stored in mnorm variable print 'The maximum value of vector is:' print mnorm

Output:

The package for NumPy is imported in the above program, and the package for using norm is imported. Then an array is created using the array function in NumPy, and it is stored in the variable called nameofthearray. Then the elements of the array nameofthearray are displayed. Then the norm() function in NumPy is used to find the maximum value of vector bypassing the nameofthearray array as the parameter to the norm() function, and the result returned is stored in a variable called mnorm, which is printed as the output on the screen. Finally, the output is shown in the snapshot above.

Recommended Articles

This is a guide to the NumPy norm. Here we discuss the concept of NumPynorm function in Python through definition, syntax, and working through programming examples and their outputs. You may also have a look at the following articles to learn more –

Learn The Internal Working Of Pyspark Foreach

Introduction to PySpark foreach

PySpark foreach is explained in this outline. PySpark foreach is an active operation in the spark that is available with DataFrame, RDD, and Datasets in pyspark to iterate over each and every element in the dataset. The For Each function loops in through each and every element of the data and persists the result regarding that. The PySpark ForEach Function returns only those elements which meet up the condition provided in the function of the For Each Loop. A simple function that applies to each and every element in a data frame is applied to every element in a For Each Loop. ForEach partition is also used to apply to each and every partition in RDD. We can create a function and pass it with for each loop in pyspark to apply it over all the functions in Spark. This is an action operation in Spark used for Data processing in Spark. In this topic, we are going to learn about PySpark foreach.

Start Your Free Software Development Course

Web development, programming languages, Software testing & others

Syntax for PySpark foreach

The syntax for the PYSPARK WHEN function is:-

def function(x): Dataframe.foreach(function) def f(x): print(x) b=a.foreach(f)

ScreenShot:

Working of PySpark foreach

Let us see somehow the ForEach function works in PySpark:-

The ForEach function in Pyspark works with each and every element in the Spark Application. We have a function that is applied to each and every element in a Spark Application.

The loop is iterated for each and every element in Spark. The function is executed on each and every element in an RDD and the result is evaluated.

Every Element in the loop is iterated and the given function is executed the result is then returned back to the driver and the action is performed.

The ForEach loop works on different stages for each stage performing a separate action in Spark. The loop in for Each iterate over items that is an iterable item, One Item is selected from the loop and the function is applied to it, if the functions satisfy the predicate for the loop it is returned back as the action.

The number of times the loop will iterate is equal to the length of the elements in the data.

If the data is not there or the list or data frame is empty the loop will not iterate.

The same can be applied with RDD, DataFrame, and Dataset in PySpark.

Example of PySpark foreach

Let us see some Example of how PYSPARK ForEach function works:

Create a DataFrame in PYSPARK:

Let’s first create a DataFrame in Python.

CreateDataFrame is used to create a DF in Python

a= spark.createDataFrame(["SAM","JOHN","AND","ROBIN","ANAND"], "string").toDF("Name") a.show()

Now let’s create a simple function first that will print all the elements in and will pass it in a For Each Loop.

def f(x) : print(x)

This is a simple Print function that prints all the data in a DataFrame.

def f(x): print(x)

Code SnapShot:

Let’s iterate over all the elements using for Each loop.

b = a.foreach(f)

This is simple for Each Statement that iterates and prints through all the elements of a Data Frame.

b = a.foreach(f)

Stages are defined and the action is performed.

Row(Name=’ROBIN’) Row(Name=’ANAND’) Row(Name=’AND’) Row(Name=’JOHN’) Row(Name=’SAM’)

Code Snapshot:

a= spark.createDataFrame(["SAM","JOHN","AND","ROBIN","ANAND"], "string").toDF("Name") b=a.foreach(print) Example #2

Let us check the type of element inside a Data Frame. For This, we will proceed with the same DataFrame as created above and will try to pass a function that defines the type of variable inside.

Create a DataFrame in PYSPARK:-

Let’s first create a DataFrame in Python.

CreateDataFrame is used to create a DF in Python

a= spark.createDataFrame(["SAM","JOHN","AND","ROBIN","ANAND"], "string").toDF("Name").show()

Code SnapShot:

This function defines the type of the variable inside.

def f(x): print(type(x))

Let’s use ForEach Statement and print the type in the DataFrame.

b = a.foreach(f)

Output:

This will print the Type of every element it iterates.

Code SnapShot:

We can also build complex UDF and pass it with For Each loop in PySpark.

From the above example, we saw the use of the ForEach function with PySpark

Note:

For Each is used to iterate each and every element in a PySpark

We can pass a UDF that operates on each and every element of a DataFrame.

ForEach is an Action in Spark.

It doesn’t have any return value.

Conclusion

From the above article, we saw the use of FOR Each in PySpark. From various examples and classification, we tried to understand how the FOREach method works in PySpark and what are is used at the programming level.

Recommended Articles

We hope that this EDUCBA information on “PySpark foreach” was beneficial to you. You can view EDUCBA’s recommended articles for more information.

How Does Collect Function Work In Scala With Examples

Introduction to Scala collect.

Collect function is used to collect elements from the given collection. Collect function can be used with the collection data structure to pick up some elements which satisfy the given condition. Collect function can be used with the mutable and immutable collection data structure in scala. It always returns us the new collection containing all the elements that satisfy the condition; this condition is referred to as the Partial function. That means it took the partial function and applied it to the all the values present inside the given collection.

Start Your Free Software Development Course

Web development, programming languages, Software testing & others

We know that it is a function available in Scala, which takes up the partial function as the parameter and will be applied on all the variables present inside it and return us the elements that satisfy the condition.

Let’s see the syntax of this function.

This is the syntax as per the scala doc:

def collect[B](pf: PartialFunction[A, B]): Traversable[B] mylistName.collect(Your_partial_function)

As you can see in the above lines of code, we can call this function on our collection object by specifying a partial function inside it as the parameter. So after calculating the values, it will return us a new collection with result values inside it.

How does collect Function Work in Scala?

We know that the collect function in scala to collect the element from a collection that are of the same type. We can call this function on any collection type, whether it is mutable or immutable. It will always return us the new collection as a result of the given collection element.

Method signature:

def collect[B](pf: PartialFunction[A, B]): Traversable[B] (This is as per the scala documentation)

1. Partial Function

The collect function takes a partial function as the parameter. This partial function will be responsible for performing the operation on the each element of the collection. It will pick out all the elements from the given collection that will satisfies the partial function.

Syntax:

In the above lines of code, we can define a partial function, what it will take and what it is going to return. This is simple to use and readable also.

val demo: PartialFunction[String, String] = { }

In the above lines of code, we are creating one partial function which will take up the string parameter and return us the string. In this way, we can specify our type what we want from the given collection.

2. Return Type

Collect function will always return us the new collection, which will contain all the elements from the given collection data structure.

Example:

object Main extends App{ val mysequence: Seq[Any] = Seq("hello", "hello again", 40, "bye", 60, 80, 100, "i am strinf as well !!") }

It is just a simple program in which we are creating a sequence of ‘any’ type in scala; also, we are specifying the different type of element inside it, for instance, string and integer. But we want to extract only the string elements from the collection, so we call the collect method and pass our partial function to match only the string. In this way, we can use a collect function in scala.

Points to be remembered while using a collect function in scala:

This can be used with the mutable and immutable collection data structure.

This function takes a partial function as the argument; this function will calculate and pick up the elements which satisfies the given condition. Always it will return as a result in the form of a new collection contain all the element.

Examples of Scala collect.

Given below are the examples of Scala collect:

Example #1

In this example, we are creating a sequence collection of any type. Using the collect function, we are trying to extract the integer, string, and double values from the given collection data structure in the scala.

object Main extends App{ println("Demo to understand collect function in scala !!") println("Extrat only string values ::") val mysequence1: Seq[Any] = Seq("hello", "hello again", 40, "bye", 60, 80, 100, "i am string as well !!") println("Result is ::") println(result1) println("***********************************************************************") println("Extrat only integer values ::") val mysequence2: Seq[Any] = Seq("Amit", 200, 20.1, "sumit", 300, 30.2, "Geet", 400 , 40.1, "vinit", 500, 50.1) println("Result is ::") println(result2) println("***********************************************************************") println("Extrat only double values ::") val mysequence3: Seq[Any] = Seq("Amit", 200, 20.1, "sumit", 300, 30.2, "Geet", 400 , 40.1, "vinit", 500, 50.1) println("Result is ::") println(result3) }

Output:

Example #2

In this example, we are trying to fetch the float value from the collect data structure. Here also we are using the collect function to extract the element from the collection. Also, we are not using any library; this is an in build function available in scala.

Code:

object Main extends App{ println("Demo to understand collect function in scala !!") println("Extrat only float values ::") val mysequence1: Seq[Any] = Seq(2.1f, "test 1", "test2", 5.7f, "test 3", "test 4", "tets 5", 8890, 1.0f, 8.9f, 2.4f) println("Result is ::") println(result1) println("***********************************************************************") }

Output:

Conclusion

By using the collect function, we can easily extract the data from the collection, which are of the same type. Also, we can use this function with any mutable and immutable collection data structure, which makes it more easy and more handy to use.

Recommended Articles

We hope that this EDUCBA information on “Scala collect” was beneficial to you. You can view EDUCBA’s recommended articles for more information.

Working Of Md5() Functions In Php With Examples

Introduction to PHP md5()

The MD5() function of the PHP Programming Language will produce the hash of the string which is like encoding process. MD5() function works only on PHP 4, 5, 7 versions but for the other PHP version the hash encoder “md5()” may work or may not work mostly. Most of the times md5() function is not recommended to safely secure the passwords due to the function’s fast nature of encoding with the help of its inbuilt hashing algorithm. It accepts only two parameters. In those two only one is mandatory at all times.

Start Your Free Software Development Course

Syntax:

String md5 ($string, $getRawOutput)

Explanation of Parameters in brief:

MD5() function of the PHP Programming Language takes two parameters at max. They are: $string parameter and $getRawOutput parameter.

$string: $string parameter will help us to expect the string to be hashed.

$getRawOutput: $getRawOutput parameter will help us to expect a Boolean value. For the TRUE result the function is going to return the HASH in raw binary format which is of the length 16.

Return type: The md5() function of PHP will return the hashed string ( it can either be in lowercase hex format character sequence which is of length 32 ( 32 character hexadecimal number )or for raw binary form which is of the length 16).

How do MD5() Functions work in PHP?

MD5() function of the PHP Programming Language works for PHP 4, PHP 5 and PHP 7 versions up to now. Apart from these versions md5() function may not work mostly. It is a built-in function and by using the md5() function we initiate the HASHING algorithm inside of the PHP Programming Language. With the backend Hashing Algorithm, conversion of hashing of the specific numerical value/ string value/ any other will be done as needed. It is very helpful in the encoding process. MD5() function value will always be in 32 bit binary format unless second parameter is used inside of the md5() function. At that time md5() value will be 16 bit binary format.

Examples to Implement PHP md5()

Below are the examples:

Example #1

Code:

<?php $str1 = 'apples'; print "This is the value of HASH of apples :: "; $a1 = md5($str1); if (md5($str1) === '1f3870be274f6c49b3e31a0c6728957f') { echo "If the value of apples is :: 1f3870be274f6c49b3e31a0c6728957f then it will print :: "; } else{ }

Output:

Example #2

Code:

<?php $input_string1 = 'Pavan Kumar Sake'; echo '16 bit binary format :: '; $i1 = md5($input_string1,TRUE); echo $i1;

Output:

Example #3

Code:

<?php $k = 10; for($i=0;$i<=$k;$i++){ print "Hash code of $i :: "; print md5($i); }

Example #4

Code:

<?php $user1 = "Pavan Kumar Sake"; $pass1 = "pavansake123"; $user1_encode = md5($user1); $pass1_encode = md5($pass1); if (md5($user1)== "4c13476f5dd387106a2a629bf1a9a4a7"){ if(md5($pass1)== "20b424c60b8495fae92d450cd78eb56d"){ echo "Password is also correct so login will be successful"; } else{ echo "Incorrect Password is entered"; } } else{ echo "Incorrect Username is entered"; }

Output:

Conclusion

I hope you understood what is the definition of PHP md5() function with the syntax and its explanation, Info regarding the parameters in brief detail, Working of md5() function in PHP along with the various examples to understand the concept well.

Recommended Articles

This is a guide to PHP MD5(). Here we discuss the introduction, syntax, and working of MD5() in PHP along with different examples and code implementation. You can also go through our other related articles to learn more –

Spotify Recommendation System Using Pyspark And Kafka Streaming

This article was published as a part of the Data Science Blogathon

Introduction

We all love listening to our favorite music every day. It is pretty hard to find songs similar to our taste and we would love a system to do this for us. We have music applications like Spotify that uses content-based and collaborative filtering to recommend us songs similar to what we like. In this article, I would like to show you how to implement a content-based music recommendation system, that takes songs from our liked playlist and recommend similar songs from a streaming data source. To carry out this process we use Kafka to stream the data, pyspark data frame, and Spark SQL to carry out the spark operations, and streamlit to visualize everything. We also use MLlib for KMeans and PCA analysis.

Streaming data using Kafka

Initially, we have a CSV file that contains all our song’s data. It has a name, artist, and features associated with the song. I had taken a dataset that has over 5 lakhs songs details which are available on Spotify. The dataset is available here:

Use this CSV and put it in the same directory where you will be having the Kafka producer code to be run. Now use the following code to get the CSV data to the stream. This is the Kafka producer code where you can stream the data as it is sent to a topic we specify. In consumer code, we can retrieve this data with the topic.

import pandas as pd from kafka import KafkaProducer from datetime import datetime import time import random import numpy as np # pip install kafka-python KAFKA_TOPIC_NAME_CONS = "songTopic" KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092' if __name__ == "__main__": print("Kafka Producer Application Started ... ") kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS, value_serializer=lambda x: x.encode('utf-8')) filepath = "tracks.csv" # This is the csv which has Spotify data. songs_df = pd.read_csv(filepath) # We use this filter to get popular songs streaming. This can be tuned based on your intrest. songs_df['order_id'] = np.arange(len(songs_df)) songs_df['artists'] = songs_df['artists'].str.replace('[^a-zA-Z]', '') songs_df['id_artists'] = songs_df['id_artists'].str.replace('[^a-zA-Z]', '') # Some pre-processing performed for clean data. song_list = songs_df.to_dict(orient="records") message_list = [] message = None for message in song_list: message_fields_value_list = [] message_fields_value_list.append(message["order_id"]) message_fields_value_list.append(message["id"]) message_fields_value_list.append(message["name"]) message_fields_value_list.append(message["popularity"]) message_fields_value_list.append(message["duration_ms"]) message_fields_value_list.append(message["explicit"]) message_fields_value_list.append(message["artists"]) message_fields_value_list.append(message["id_artists"]) message_fields_value_list.append(message["release_date"]) message_fields_value_list.append(message["danceability"]) message_fields_value_list.append(message["energy"]) message_fields_value_list.append(message["key"]) message_fields_value_list.append(message["loudness"]) message_fields_value_list.append(message["mode"]) message_fields_value_list.append(message["speechiness"]) message_fields_value_list.append(message["acousticness"]) message_fields_value_list.append(message["instrumentalness"]) message_fields_value_list.append(message["liveness"]) message_fields_value_list.append(message["valence"]) message_fields_value_list.append(message["tempo"]) message_fields_value_list.append(message["time_signature"]) message = ','.join(str(v) for v in message_fields_value_list) print("Message Type: ", type(message)) print("Message: ", message) kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, message) time.sleep(1) print("Kafka Producer Application Completed. ")

This is the code for the producer and we stream data after running this code. You can see an output like this if the code runs properly on your IDE:

Message: 0,3BFRqZFLSrqtQr6cjHbAxU,Ain’t Misbehavin’,51,237773,0,FatsWaller,DYWCXTkNqGFZIfSrWEa,1926,0.515,0.222,0,-16.918,0,0.0575,0.821,0.00193,0.19,0.35,98.358,4 Message: 1,61znp1Iy11bdJ2YAbwaqw7,Sing, Sing, Sing,51,520133,0,BennyGoodman,pBuKaLHJlIlqYxQQaflve,1928,0.626,0.744,2,-9.189,0,0.0662,0.847,0.892,0.145,0.259,113.117,4 Message: 2,0RNxWy0PC3AyH4ThH3aGK6,Mack the Knife,55,201467,0,LouisArmstrong,eLuQmkaCobbVDHceek,1929,0.673,0.377,0,-14.141,1,0.0697,0.586,0.0,0.332,0.713,88.973,4 # So on…

  Pyspark consumer for streaming data

So, we saw how the data is streamed using Kafka. We had used “songTopic” as the topic name. In the consumer code, we use this same topic name to retrieve the data which is streamed from the producer.

from chúng tôi import SparkSession from pyspark.sql.functions import * from pyspark.ml.feature import Normalizer, StandardScaler import random import time kafka_topic_name = "songTopic" kafka_bootstrap_servers = 'localhost:9092' spark = SparkSession .builder .appName("Spotify Streaming Reccomendation System") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("ERROR")

Here, we have built a basic spark session and also initialized the topic from which we retrieve the streaming data.

songs_df = spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers) .option(“subscribe”, kafka_topic_name) .option(“startingOffsets”, “latest”) .load() songs_df1 = songs_df.selectExpr(“CAST(value AS STRING)”, “timestamp”) songs_schema_string = “order_id INT,id STRING, name STRING,popularity INT, duration_ms DOUBLE, explicit INT, ” + “artists STRING, id_artists STRING, release_date STRING, ” + “danceability DOUBLE,” + “energy DOUBLE, key INT, loudness DOUBLE, ” + “mode INT,” + “speechiness DOUBLE,” + “acousticness DOUBLE, instrumentalness DOUBLE, liveness DOUBLE, ” + “valence DOUBLE, tempo DOUBLE, time_signature DOUBLE” songs_df2 = songs_df1 .select(from_csv(col(“value”), songs_schema_string) .alias(“song”), “timestamp”) songs_df3 = songs_df2.select(“song.*”, “timestamp”)

Spark SQL View

Next, we create the schema for our streaming data. We create it in such a way that it matches the data coming from our producer. The schema is created with Spark SQL and we finally add a timestamp to each row as data arrives.

songs_df3.createOrReplaceTempView("song_find"); song_find_text = spark.sql("SELECT * FROM song_find") songs_agg_write_stream = song_find_text .writeStream .trigger(processingTime='5 seconds') .outputMode("append") .option("truncate", "false") .format("memory") .queryName("testedTable5") .start() songs_agg_write_stream.awaitTermination(1)

Finally, we create an SQL View so that the data streaming can be put into a View and written to memory. We use a processing time of 5 seconds in append mode to get all the data incoming from the producer.

Favorite song data generated using Spotify API import pandas as pd from spotify_api import getSong song_data = getSong.passs() #song_data.rename(columns={'duration_s': 'duration_ms' }, inplace=True) song_data = song_data.drop(['id', 'added_at', 'time_signature','duration_s'], axis='columns') rand_n = random. randint(0,len(song_data)-1) add_df = song_data.head(rand_n)[-1:]

This code will help us retrieve a random song from our Spotify liked songs playlist. Now, this is an abstraction and the real code is being implemented on a different python file. So, feel free to add the following python file :

#!/usr/bin/env python # coding: utf-8 import os #import my_spotify_credentials as credentials import numpy as np import pandas as pd import ujson import spotipy import spotipy.util import seaborn as sns # fill your credentials here. os.environ["SPOTIPY_CLIENT_ID"] = '' os.environ["SPOTIPY_CLIENT_SECRET"] = '' os.environ["SPOTIPY_REDIRECT_URI"] = '' scope = 'user-library-read' username = '' token = spotipy.util.prompt_for_user_token(username, scope) if token: spotipy_obj = spotipy.Spotify(auth=token) saved_tracks_resp = spotipy_obj.current_user_saved_tracks(limit=50) else: print('Couldn't get token for that username') number_of_tracks = saved_tracks_resp['total'] print('%d tracks' % number_of_tracks) def save_only_some_fields(track_response): return { 'id': str(track_response['track']['id']), 'name': str(track_response['track']['name']), 'artists': [artist['name'] for artist in track_response['track']['artists']], 'duration_ms': track_response['track']['duration_ms'], 'popularity': track_response['track']['popularity'], 'added_at': track_response['added_at'] } tracks = [save_only_some_fields(track) for track in saved_tracks_resp['items']] while saved_tracks_resp['next']: saved_tracks_resp = spotipy_obj.next(saved_tracks_resp) tracks.extend([save_only_some_fields(track) for track in saved_tracks_resp['items']]) tracks_df = pd.DataFrame(tracks) pd.set_option('display.max_rows', len(tracks)) tracks_df['artists'] = tracks_df['artists'].apply(lambda artists: artists[0]) tracks_df['duration_ms'] = tracks_df['duration_ms'].apply(lambda duration: duration/1000) tracks_df = tracks_df.rename(columns = {'duration_ms':'duration_s'}) audio_features = {} for idd in tracks_df['id'].tolist(): audio_features[idd] = spotipy_obj.audio_features(idd)[0] tracks_df['acousticness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['acousticness']) tracks_df['speechiness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['speechiness']) tracks_df['key'] = tracks_df['id'].apply(lambda idd: str(audio_features[idd]['key'])) tracks_df['liveness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['liveness']) tracks_df['instrumentalness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['instrumentalness']) tracks_df['energy'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['energy']) tracks_df['tempo'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['tempo']) tracks_df['time_signature'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['time_signature']) tracks_df['loudness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['loudness']) tracks_df['danceability'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['danceability']) tracks_df['valence'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['valence']) class getSong(): def __init__(self): super(getSong, self).__init__() def passs(): return tracks_df

Using this we can now get a random favorite song from the Spotify playlist.

Feature Engineering df = spark.sql("SELECT * FROM testedTable5") df = df.sort(df.release_date.desc()) df_stream = df df = df.drop('order_id', 'id', 'explicit', 'mode', 'release_date', 'id_artists', 'time_signature', 'duration_ms', 'timestamp') df_sp = spark.createDataFrame(add_df) df = df.union(df_sp) from pyspark.ml.feature import VectorAssembler assembler=VectorAssembler(inputCols=[ 'danceability', 'energy', 'loudness', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo'], outputCol='features') assembled_data=assembler.setHandleInvalid("skip").transform(df)

We initially drop the unwanted columns from our spark data frame. We append our favorite song data to this data frame using the sparks union operation. It is important for us the get all features to a ‘features column’ for which we use VectorAssembler from pyspark.ml.feature library. The assembled_data here is a data frame that has the feature vector attached with all other columns.

from pyspark.ml.feature import StandardScaler scale=StandardScaler(inputCol='features',outputCol='standardized') data_scale=scale.fit(assembled_data) df=data_scale.transform(assembled_data)

We use a standard scaler to scale the features column we generated earlier. So that this scaled column can be further used to perform K-Means clustering.

K-Means Clustering from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import ClusteringEvaluator silhouette_score=[] evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', metricName='silhouette', distanceMeasure='squaredEuclidean') KMeans_algo=KMeans(featuresCol='standardized', k=3) KMeans_fit=KMeans_algo.fit(df) output_df =KMeans_fit.transform(df)

Now, this is a step that we can skip if we want. But when we have very large data incoming it is always better that we perform K-Means so that we can cluster data and then we can use the recommendation on this clustered dataset based on what cluster our favorite song falls. We use MLlib to perform clustering. We can also perform PCA Analysis using MLlib and we had found that almost all numerical features we have used take up to 90% variance. So all features are being included here.

Recommendation system code import numpy as np, pandas as pd import matplotlib.pyplot as plt, seaborn as sns from tqdm import tqdm import warnings warnings.filterwarnings("ignore") class SpotifyRecommender(): def __init__(self, rec_data): self.rec_data_ = rec_data def spotify_recommendations(self, song_name, amount=1): distances = [] song = self.rec_data_[(self.rec_data_.name.str.lower() == song_name.lower())].head(1).values[0] # get details of our fav song from name we pass as x earlier. res_data = self.rec_data_[self.rec_data_.name.str.lower() != song_name.lower()] #dropping the data with our fav song so that it doesnt affect our recommendation. for r_song in tqdm(res_data.values): # tqdm is just used for showing the bar of iteration through our streamed songs. dist = 0 for col in np.arange(len(res_data.columns)): #indeces of non-numerical columns neednt be considered. if not col in [0,1,13]: #calculating the manhettan distances for each numerical feature dist = dist + np.absolute(float(song[col]) - float(r_song[col])) distances.append(dist) # distances are calculated and appended and added to a new column called distances in our dataset. res_data['distance'] = distances #sorting our data to be ascending by 'distance' feature res_data = res_data.sort_values('distance') # resulting dataset have the song similar to our fav song's numerical values and thus recommended. columns = ['name', 'artists', 'acousticness', 'liveness', 'instrumentalness', 'energy', 'danceability', 'valence'] return res_data[columns][:amount] datad = output_df.select('name', 'artists', 'danceability', 'energy', 'key', 'loudness', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'prediction') datf = datad.toPandas() datf.drop(datf[datf['artists'] == '0'].index, inplace = True) datf.drop_duplicates(inplace=True) datf.drop(datf[datf['danceability'] == 0.0000].index, inplace = True) datf.drop(datf[datf['liveness'] == 0.000].index, inplace = True) datf.drop(datf[datf['instrumentalness'] == 0.000000].index, inplace = True) datf.drop(datf[datf['energy'] == 0.0000].index, inplace = True) datf.drop(datf[datf['danceability'] == 0.000].index, inplace = True) datf.drop(datf[datf['valence'] == 0.000].index, inplace = True) y = datf value_pred = datf.iloc[-1:]['prediction'] #datf = datf[datf['prediction'] == list(value_pred)[0]] recommender = SpotifyRecommender(datf) x = add_df['name'].tolist()[0] rec_song = recommender.spotify_recommendations(x, 10) v = add_df[['name', 'artists', 'acousticness', 'liveness', 'instrumentalness', 'energy', 'danceability', 'valence']] rec_song = pd.concat([rec_song, v]) rec_song.to_csv('rec_song.csv') df_rec = spark.createDataFrame(rec_song) df_rec.show()

| name| artists|acousticness|liveness|instrumentalness|energy|danceability|valence| | Tennessee Whiskey| ChrisStapleton| 0.205| 0.0821| 0.0096| 0.37| 0.392| 0.512| | Element| PopSmoke| 0.0301| 0.251| 2.18E-6| 0.878| 0.772| 0.305| | River| BishopBriggs| 0.302| 0.0579| 2.97E-6| 0.477| 0.733| 0.545| | Edelweiss|BillLeeCharmianCarr| 0.785| 0.126| 4.64E-4| 0.156| 0.233| 0.354| | Cradles| SubUrban| 0.27| 0.179| 6.48E-5| 0.585| 0.581| 0.63| |Make You Feel My …| Adele| 0.907| 0.105| 3.83E-4| 0.172| 0.317| 0.0963| | Lover| TaylorSwift| 0.492| 0.118| 1.58E-5| 0.543| 0.359| 0.453| | SAD!| XXXTENTACION| 0.258| 0.123| 0.00372| 0.613| 0.74| 0.473| |I Got It Bad And …| OscarPetersonTrio| 0.971| 0.0882| 0.911|0.0527| 0.488| 0.193| | Sweet Caroline| NeilDiamond| 0.611| 0.237| 1.09E-4| 0.127| 0.529| 0.578| | Naina – Lofi Flip| Mrunal Meena| 0.72| 0.299| 0.897| 0.258| 0.641| 0.321|

This is the final recommendation based on the song we provide. As you can see to check how good it is I passed “Naina-Lofi” as my favorite song as it’s Hindi music and slow song but most of the songs that were recommended were slow music with the same kind of attributes. To visualize everything we can use streamlit.

Visualization using Streamlit

Just run streamlit using this command:

streamlit run dashboard.py

The code for the dashboard is here:

import pandas as pd import numpy as np import streamlit as st import plotly.graph_objects as go import plotly.express as px import os # import my_spotify_credentials as credentials import numpy as np import pandas as pd import ujson import spotipy import spotipy.util import seaborn as sns # fill credentials here. os.environ["SPOTIPY_CLIENT_ID"] = '' os.environ["SPOTIPY_CLIENT_SECRET"] = '' os.environ["SPOTIPY_REDIRECT_URI"] = '' scope = 'user-library-read' username = '' token = spotipy.util.prompt_for_user_token(username, scope) if token: spotipy_obj = spotipy.Spotify(auth=token) saved_tracks_resp = spotipy_obj.current_user_saved_tracks(limit=50) else: print('Couldn't get token for that username') number_of_tracks = saved_tracks_resp['total'] print('%d tracks' % number_of_tracks) def save_only_some_fields(track_response): return { 'id': str(track_response['track']['id']), 'name': str(track_response['track']['name']), 'artists': [artist['name'] for artist in track_response['track']['artists']], 'duration_ms': track_response['track']['duration_ms'], 'popularity': track_response['track']['popularity'], 'added_at': track_response['added_at'] } tracks = [save_only_some_fields(track) for track in saved_tracks_resp['items']] while saved_tracks_resp['next']: saved_tracks_resp = spotipy_obj.next(saved_tracks_resp) tracks.extend([save_only_some_fields(track) for track in saved_tracks_resp['items']]) tracks_df = pd.DataFrame(tracks) pd.set_option('display.max_rows', len(tracks)) #pd.reset_option('display.max_rows') tracks_df['artists'] = tracks_df['artists'].apply(lambda artists: artists[0]) tracks_df['duration_ms'] = tracks_df['duration_ms'].apply(lambda duration: duration/1000) tracks_df = tracks_df.rename(columns = {'duration_ms':'duration_s'}) audio_features = {} for idd in tracks_df['id'].tolist(): audio_features[idd] = spotipy_obj.audio_features(idd)[0] tracks_df['acousticness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['acousticness']) tracks_df['speechiness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['speechiness']) tracks_df['key'] = tracks_df['id'].apply(lambda idd: str(audio_features[idd]['key'])) tracks_df['liveness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['liveness']) tracks_df['instrumentalness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['instrumentalness']) tracks_df['energy'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['energy']) tracks_df['tempo'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['tempo']) tracks_df['time_signature'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['time_signature']) tracks_df['loudness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['loudness']) tracks_df['danceability'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['danceability']) tracks_df['valence'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['valence']) df = tracks_df st.set_page_config(layout="wide") hide_streamlit_style = """ footer {visibility: hidden;} """ st.markdown(hide_streamlit_style, unsafe_allow_html=True) st.title('Spotify User Dashboard') col1, col2 = st.beta_columns(2) # col1.header("Your Latest added Song") top_5_songs = df[['name', 'artists']].head(5) col1.table(top_5_songs) # col2.header("Your Top 10 Artists") df1 = df['artists'].value_counts()[:11].to_frame() df1['Name'] = df1.index df1.rename(columns={'artists': 'Songs'}, inplace=True) fig = px.pie(df1, values='Songs', names='Name', hole=0.2) fig.update_traces(textposition='inside', textinfo='label') col2.plotly_chart(fig, use_container_width=True) #### col3, col4, col5 = st.beta_columns(3) # ur_favourite_artist = df[['artists']].value_counts().index[0][0] st.markdown(""" .big-font { font-size:30px !important; font-Weight: bold; } """, unsafe_allow_html=True) col3.header("Your Favourite Artist") # col4.header("Total Time of Songs") time = round(df.duration_s.sum() / 3600, 2) # col5.header("Total Number of Songs") # #### col6,col7 = st.beta_columns(2) col6.header("Your Recommended Songs") df2 = rec_df[['name','artists']] print(df2) col6.table(df2.head(10)) # col7.header("Features of your Latest Songs") df3 = rec_df.loc[:10, ['name', 'artists', 'acousticness', 'liveness', 'instrumentalness', 'energy', 'danceability', 'valence']] df3 = df3.T.reset_index() df3.rename(columns={'index': 'theta', 0: 'zero', 1: 'one', 2: 'two', 3: 'three', 4: 'four',5:'five',6:'six',7:'seven',8:'eight',9:'nine',10:'ten',11:'eleven',12:'twelve'}, inplace=True) df3_cols = df3.columns[1:] len_cols = len(df3_cols) categories = df3['theta'].tolist()[2:] fig1 = go.Figure() for i in range(0, len_cols): fig1.add_trace(go.Scatterpolar( r=df3[df3_cols[i]][2:].tolist(), theta=categories, fill='toself', name=df3[df3_cols[i]][0])) fig1.update_layout( polar=dict( radialaxis=dict( visible=True, range=[0, 1] )), showlegend=True ) col7.plotly_chart(fig1, use_container_width=True)

Final Output   References

 

Conclusion

In any case, you need help regarding setting up pyspark and Kafka on your Windows machine feel free to contact me at [email protected] or on my LinkedIn page:

The entire code above can be followed on this GitHub repo:

The media shown in this article are not owned by Analytics Vidhya and are used at the Author’s discretion.

Related

Update the detailed information about Working And Examples Of Pyspark Collect on the Cattuongwedding.com website. We hope the article's content will meet your needs, and we will regularly update the information to provide you with the fastest and most accurate information. Have a great day!