Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
April 2, 2024 12:32 am GMT

Quick tip: Using Apache Spark Structured Streaming with SingleStore Notebooks


Continuing our series on using Apache Spark with SingleStore, we'll look at a simple example of how to read the data in a set of local text files, create vector embeddings and save the file data and embeddings in SingleStore using Spark's Structured Streaming.

The notebook file used in this article is available on GitHub.

Create a SingleStore Cloud account

A previous article showed the steps to create a free SingleStore Cloud account. We'll use the following settings:

  • Workspace Group Name: Spark Demo Group
  • Cloud Provider: AWS
  • Region: US East 1 (N. Virginia)
  • Workspace Name: spark-demo
  • Size: S-00

Create a new notebook

From the left navigation pane in the cloud portal, we'll select Develop > Notebooks.

In the top right of the web page, we'll select New Notebook > New Notebook, as shown in Figure 1.

Figure 1. New Notebook.

Figure 1. New Notebook.

We'll call the notebook spark_streaming_demo, select a Blank notebook template from the available options, and save it in the Personal location.

Fill out the notebook

First, let's install Spark:

!pip cache purge --quiet!conda install -y --quiet -c conda-forge openjdk pyspark

Next, we'll install some libraries:

!pip install openai==0.28 --quiet!pip install nltk --quiet

We'll enter our OpenAI API Key:

import getpassimport openaios.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")

Now we'll create two directories to store some jar files and data files:

import osos.makedirs("jars", exist_ok = True)os.makedirs("data", exist_ok = True)

We'll now download some jar files, as follows:

import requestsdef download_jar(url, destination):    response = requests.get(url)    with open(destination, "wb") as f:        f.write(response.content)jar_urls = [    ("", "jars/singlestore-jdbc-client-1.2.1.jar"),    ("", "jars/singlestore-spark-connector_2.12-4.1.5-spark-3.5.0.jar"),    ("", "jars/commons-dbcp2-2.12.0.jar"),    ("", "jars/commons-pool2-2.12.0.jar"),    ("", "jars/spray-json_3-1.3.6.jar")]for url, destination in jar_urls:    download_jar(url, destination)print("JAR files downloaded successfully")

These jar files include the SingleStore JDBC Client and the SingleStore Spark Connector, as well as several other jar files needed for connectivity and data management.

Now we are ready to create a SparkSession:

from pyspark.sql import SparkSession# Create a Spark sessionspark = (SparkSession             .builder             .config("spark.jars", ",".join([destination for _, destination in jar_urls]))             .appName("Spark Streaming Test")             .getOrCreate()        )spark.sparkContext.setLogLevel("ERROR")

We need some data and we need to create some files. We can create a small number of files with each file containing a meaningful sentence, as follows:

import nltkimport randomfrom nltk.corpus import wordnet as wnfrom nltk.tokenize import word_tokenize# Download"punkt")"averaged_perceptron_tagger")"wordnet")"omw")# Define the directory to save the filesoutput_dir = "data"# Generate meaningful sentencesdef generate_meaningful_sentence():    # Choose a random set of synonyms from WordNet    synset = random.choice(list(wn.all_synsets()))    # Generate a sentence    definition = synset.definition()    tokens = word_tokenize(definition)    # Capitalise the first word and end with a period    tokens[0] = tokens[0].capitalize()    tokens[-1] = tokens[-1] + "."    return " ".join(tokens)# Number of files to generatenum_files = 5# Number of sentences in each filenum_sentences_per_file = 1# Generate text filesfor i in range(num_files):    file_path = os.path.join(output_dir, f"file_{i+1}.txt")    with open(file_path, "w") as file:        for _ in range(num_sentences_per_file):            # Generate a meaningful sentence            sentence = generate_meaningful_sentence()            file.write(sentence + "

We can easily check the contents of the files using the following:

%%bashfor file in data/*.txt; do    echo "File: $file"    cat "$file"    echo "----------------------"done

Example output:

File: data/file_1.txtResembling steel in hardness.----------------------File: data/file_2.txtAn instruction that is either not recognized by an operating system or is in violation of the procedural rules.----------------------File: data/file_3.txtDestructive to the spirit or soul.----------------------File: data/file_4.txtThornbills.----------------------File: data/file_5.txtA small mass of soft material.----------------------

Now we'll create the connection details to SingleStore:

host = "<HOST>"password = "<PASSWORD>"port = "3306"cluster = host + ":" + port

Replace <HOST> and <PASSWORD> with the values for your environment. These values can be obtained from the workspace using Connect > SQL IDE.

We also need to set some configuration parameters:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)spark.conf.set("spark.datasource.singlestore.user", "admin")spark.conf.set("spark.datasource.singlestore.password", password)spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")

A database is required, so we'll create one:


We'll also create a table to store the file data and vector embeddings:

USE spark_demo;DROP TABLE IF EXISTS streaming;CREATE TABLE IF NOT EXISTS streaming (     value TEXT,     file_name TEXT,     embedding VECTOR(1536) NOT NULL);

Now we'll define and register a UDF to convert the text in each file to vector embeddings, as follows:

from pyspark.sql.functions import input_file_name, udffrom pyspark.sql.types import StringType# Generate embeddings for textdef generate_embeddings(text):    openai.api_key = os.environ.get("OPENAI_API_KEY")    # Generate embeddings for text using OpenAI    return openai.Embedding.create(        input = text,        engine = "text-embedding-3-small"    ).data[0].embedding# Register the function as a UDFgenerate_embeddings_udf = udf(generate_embeddings, StringType())

Now we are ready to read the data in each file, convert the text to embeddings and write the data to SingleStore:

import timeinput_dir = output_dir# Read from the directorydf = (spark.readStream    .format("text")    .option("path", input_dir)    .load()    .withColumn("file_name", input_file_name()))# Apply the function to the DataFrame to generate embeddings for each rowdf_with_embeddings = df.withColumn("embedding", generate_embeddings_udf("value"))# Write each batch of data to SingleStoredef write_to_singlestore(df_with_embeddings, epoch_id):    (df_with_embeddings.write         .format("singlestore")         .option("loadDataCompression", "LZ4")         .mode("append")         .save("spark_demo.streaming")    )# Write the streaming DataFrame to SingleStore using foreachBatchquery = (df_with_embeddings.writeStream    .foreachBatch(write_to_singlestore)    .start())# Wait for the query to finish processingwhile query.isActive:    time.sleep(1)    if not query.status["isDataAvailable"]:        query.stop()

We can use an SQL statement to quickly view the data:

USE spark_demo;SELECT    SUBSTR(value, 1, 30) AS value,    SUBSTR(file_name, LENGTH(file_name) - 9) AS file_name,    SUBSTR(embedding, 1, 50) AS embeddingFROM streaming;

Finally, we can stop Spark:



In this short article, we used Spark's Structured Streaming to read data from text files located in a local directory, created vector embeddings for the data in each file and stored the file data and embeddings in SingleStore.

We could improve the code to keep monitoring the directory for new files that may arrive. We could also improve the code to manage larger quantities of text in each data file by chunking the data, which would require some schema changes so that we could correctly manage all the data for each data file.

Original Link:

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To