Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
March 24, 2022 05:37 am GMT

Rust: CSV processing

This blog post is about CSV processing, and it sounds boring, but I want to share my experience on how I read and write a CSV file of almost 1 GB in just a few seconds.

As a part of the series Serverless Rust you can check out the other parts:

Part 1 and 2 describe how to set up Rust and VsCode.

Part 3 how to process in parallel AWS SQS messages.

Part 4 how to execute AWS Step Function for each AWS SQS message received.

Part 5 how to inject configuration with AWS AppConfig.

Problem

I want to read IMDb Datasets and process the title.basics.tsv.gz so that I can play with Amazon Neptune.

I will not talk about Amazon Neptune in this blog post, but I will focus on CSV processing.

Because the files of this dataset are massive, I already knew from past experiences in different languages that it could be a problem. So I asked for advice from Nicolas Moutschen, leveraging his experience with Rust. Nicolas pointed me straight away to the libraries and some methods to use.

CSV

I use CSV crate.
There is excellent documentation around, and you can find a few links here:

Processing - take 1

As I said, I started following Nicolas Moutschen suggestions to reading and processing data from streams, and Nicolas was kind to show me this.

I needed to change the original CSV on something that I could use with Amazon Neptune, and so I replaced:

println!("{}", String::from_utf8_lossy(&buffer[..len]));

With the necessary code needed to write a new CSV.

Cargo.toml dependencies:

[dependencies]async-compression = { version = "0.3.12", features = ["all", "tokio"] }csv = "1.1.6"hyper = { version = "0.14", features = ["full"] }hyper-tls = "0.5.0"serde =  { version = "1.0", features = ["derive"] }tokio = { version = "1", features = ["full"] }tokio-stream = "0.1.8"tokio-util = { version = "0.6.9", features = ["full"] } 

The complete code is the following:

use csv;use hyper::Client;use hyper_tls::HttpsConnector;use std::io;use tokio::io::AsyncReadExt;use tokio_stream::StreamExt;use tokio_util::io::StreamReader;const LINK: &str = "https://datasets.imdbws.com/title.basics.tsv.gz";#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> {    let https = HttpsConnector::new();    let client = Client::builder().build::<_, hyper::Body>(https);    let res = client.get(LINK.parse()?).await?;    let body = res        .into_body()        .map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));    let body = StreamReader::new(body);    let mut decoder = async_compression::tokio::bufread::GzipDecoder::new(body);    let mut buffer = [0; 1024];    let mut wtr = csv::Writer::from_path("./export/title.csv")?;    wtr.write_record(&[        "~id",        "~label",        "titleType",        "primaryTitle",        "originalTitle",        "isAdult",        "startYear",        "endYear",        "runtimeMinutes",        "genres",    ])?;    loop {        let len = decoder.read(&mut buffer).await?;        if len == 0 {            break;        }        let line = String::from_utf8_lossy(&buffer[..len]);        let line: Vec<&str> = line.split("").collect();        wtr.write_record(&[            line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],            line[8],        ])?;    }     wtr.flush()?;    Ok(())}

This part of the code (please do not mind the quality) :

  let line = String::from_utf8_lossy(&buffer[..len]);        let line: Vec<&str> = line.split("").collect();        wtr.write_record(&[            line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],            line[8],        ])?;

This code has a problem when I convert the bytes to strings.
Because of the formatting of the CSV and the size of the buffer, the rows with this conversion were mixed.
If, for example, we assume that the original CSV is made of:

tconst  titleType   primaryTitle    originalTitle   isAdult startYear   endYear runtimeMinutes  genrestt0000001   short   Carmencita  Carmencita  0   1894    \N  1   Documentary,Shorttt0000002   short   Le clown et ses chiens  Le clown et ses chiens  0   1892    \N  5   Animation,Shorttt0000003   short   Pauvre Pierrot  Pauvre Pierrot  0   1892    \N  4   Animation,Comedy,Romancett0000004   short   Un bon bock Un bon bock 0   1892    \N  12  Animation,Shorttt0000005   short   Blacksmith Scene    Blacksmith Scene    0   1893    \N  1   Comedy,Shorttt0000006   short   Chinese Opium Den   Chinese Opium Den   0   1894    \N  1   Shorttt0000007   short   Corbett and Courtney Before the Kinetograph Corbett and Courtney Before the Kinetograph 0   1894    \N  1   Short,Sport

A line might be:

//get the trailer of the new rowtt0000001   short   Carmencita  Carmencita  0   1894    \N  1   Documentary,Shorttt0000002or//incompleteCourtney Before the Kinetograph

The problem is possibly how I used the libraries or missed something, but I could not find a solution, so I moved on.

Processing - take 2

I tried to force the stream into the CSV ReaderBuilder.

loop {        let len = decoder.read(&mut buffer).await?;        if len == 0 {            break;        }        let mut rdr = csv::ReaderBuilder::new()            .has_headers(true)            .delimiter(b'')            .flexible(true)            .from_reader(&buffer[..len]);        for result in rdr.records() {             let record = result?;             wtr.serialize(Record {                    id: record[0].to_string(),                    label: "movies".to_string(),                    title_type: record[1].to_string(),                    primary_title: record[2].to_string(),                    original_title: record[3].to_string(),                    is_adult: record[4].to_string().to_bool(),                    start_year: record[5].parse::<u16>().unwrap_or_default(),                    end_year: record[6].parse::<u16>().unwrap_or_default(),                    runtime_minutes: record[7].parse::<u16>().unwrap_or_default(),                    genres: record[8].to_string(),                })?;        }    }

But the problem was the same incomplete data, but with some condition around, I made it works, but I was not happy because the code was ugly, so I decided to move on.

Processing - final take...for now

I assume that I have downloaded the compressed file and have the CSV in a folder for this part of the code.

use csv;use serde::{Deserialize, Deserializer, Serialize};use std::error::Error;#[tokio::main]async fn main() -> Result<(), Box<dyn Error>> {    let mut wtr = csv::WriterBuilder::default()        .has_headers(false)        .from_path("./export/title.csv")?;    wtr.write_record(&[        "~id",        "~label",        "titleType",        "primaryTitle",        "originalTitle",        "isAdult",        "startYear",        "endYear",        "runtimeMinutes",        "genres",    ])?;    let mut rdr = csv::ReaderBuilder::new()        .has_headers(true)        .delimiter(b'')        .double_quote(false)        .escape(Some(b'\\'))        .flexible(true)        //.comment(Some(b'#'))        .from_path("./import/title.basics.tsv")?;    for result in rdr.deserialize() {        let record: Record = result?;        wtr.serialize(record)?;    }    wtr.flush()?;    Ok(())}#[derive(Debug, Deserialize, Serialize)]struct Record {    #[serde(alias = "tconst")]    id: String,    #[serde(default = "default_label")]    label: String,    #[serde(alias = "titleType")]    title_type: String,    #[serde(alias = "primaryTitle")]    primary_title: String,    #[serde(alias = "originalTitle")]    original_title: String,    #[serde(alias = "isAdult")]    #[serde(deserialize_with = "bool_from_string")]    is_adult: bool,    #[serde(alias = "startYear")]    #[serde(deserialize_with = "csv::invalid_option")]    start_year: Option<u16>,    #[serde(alias = "endYear")]    #[serde(deserialize_with = "csv::invalid_option")]    end_year: Option<u16>,    #[serde(alias = "runtimeMinutes")]    #[serde(deserialize_with = "csv::invalid_option")]    runtime_minutes: Option<u16>,    #[serde(alias = "genres")]    #[serde(deserialize_with = "csv::invalid_option")]    genres: Option<String>,}fn default_label() -> String {    "movies".to_string()}/// Deserialize bool from String with custom value mappingfn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>where    D: Deserializer<'de>,{    match String::deserialize(deserializer)?.as_ref() {        "1" => Ok(true),        "0" => Ok(false),        _ => Ok(false),    }}

I decide to use csv::ReaderBuilder to read the CSV file.
To read this CSV data, I set the following:

  • Enable headers. This should skip the first line.
  • Change the delimiter from "," to "tabs".
  • Escape the backslash.
  • Permit flexible length records since some are in a strange format.

Instead of dealing with arbitrary records, I use Serde to deserialize records with specific types. For example, I have applied Serde annotation to attributes to map the original CSV to mine.

#[derive(Debug, Deserialize, Serialize)]struct Record {    #[serde(alias = "tconst")]    id: String,  .....

Because the "isAdult" column in the original CSV is in the form of "1" and "0" and not pure boolean, I need to convert them so, I wrote an extension:

/// Deserialize bool from String with custom value mappingfn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>where    D: Deserializer<'de>,{    match String::deserialize(deserializer)?.as_ref() {        "1" => Ok(true),        "0" => Ok(false),        _ => Ok(false),    }}

Finally, because other data is not mandatory in the original CSV and it will appear as /N, I must handle the wrong deserialization when the target is in a different type.

For this, I can use csv::invalid_option:

#[serde(deserialize_with = "csv::invalid_option")]

It will tell Serde to convert any deserialization errors on this field to a None value.

The outcome of the serialization will be a CSV with empty values (note the ",,")

~id,~label,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genrestt0000001,movies,short,Carmencita,Carmencita,false,1894,,1,"Documentary,Short"

Conclusion

Currently, AWS Lambda has 512-MB temporary storage, so this use case will not fit because of the file size. However, if we have more significant temporary storage one day, we could run it inside the AWS Lambda.

One alternative is to use Amazon EFS, a fully managed, flexible, shared file system designed to be consumed by other AWS services. It was announced on Jun 16, 2020. AWS Lambda will automatically mount the file system and provide a local path to read and write data. If you want to read more, there is an excellent article here.

Another alternative is to use AWS Batch with spot instances in conjunction with AWS Step Functions leveraging the service integration Run a Job (.sync) pattern. After calling AWS Batch submitJob, the workflow pauses. When the job is complete, Step Functions progresses to the next state.

The CSV crate does a fantastic job, and it is unbelievable faster. I can run this script on my computer in release mode and process it all in around 10seconds.


Original Link: https://dev.to/aws-builders/rust-csv-processing-1nda

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To