Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
December 30, 2021 08:29 pm GMT

Streaming Tweets with Go

Building with free APIs is a great way to teach yourself new skills in languages you like. Ive always found APIs as an underrated way to learn something new. Building with APIs brings challenges that force you to learn new parts of programming that video tutorials can not do.

Twitters APIs filtered stream endpoint allows you to filter the real-time stream of public Tweets. You can tap into twitter discussions by filtering tweets for specific attributes. You can find the latest job postings, monitor weather events, or keep on top of trends.

In this article I will discuss how to create twitter rules and manage a stream with my open source library twitterstream. This library was built for my project findtechjobs so I could find the latest tech jobs posted on twitter.

If you want a complete code example to get started, head over to the examples on twitterstream

Where do I start?

The first step is to create an app on Twitter Developers and obtain a set of consumer keys. One you have an API key and an API secret key, you can generate an access token with twitterstream
Generate an Access Token

We can use twitterstream to generate an access token. This access token will be used to authenticate all network requests going forward. In the code below, we make a network request to twitters oauth2/token endpoint with the The 'Basic' HTTP Authentication Scheme. Then we create an instance of twitterstream with our access token.

    tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret("YOUR_KEY", "YOUR_SECRET_KEY").RequestBearerToken()// Create an instance of twitter api    api := twitterstream.NewTwitterStream(tok.AccessToken)

Set up Streaming Rules

Streaming rules make your stream deliver relevant information. The rules match a variety of twitter attributes such as message keywords, hashtags, and URLs. Creating great rules is fundamental to having a successful twitter stream. Its important to continue refining your rules as you stream so you can harvest relevant information.

Lets create a stream for software engineer job postings with twitterstream. A valid job posting tweet should should be:

  • Posted in the english language
  • Not a retweet
  • Not a reply to another tweet
  • Contain the word hiring
  • And contain the words software developer or software engineer

The twitterstream package makes building rules easy. We can use a NewRuleBuilder to create as many rules as the Twitter API allows for our consumer keys.

rules := twitterstream.NewRuleBuilder().        AddRule("lang:en -is:retweet -is:quote hiring (software developer OR software engineer)", "hiring software role").            Build()res, err := api.Rules.Create(rules, false)

The first part is using twitterstream to create a NewRuleBuilder.

We pass in two arguments when we add our rule with AddRule. The first is a long string with many operators. Successive operators with a space between them will result in boolean "AND" logic, meaning that Tweets will match only if both conditions are met. For example cats dogs will match tweets that contain the words cats and dogs. The second argument for AddRule is the tag label. This is a free-form text you can use to identify the rules that matched a specific Tweet in the streaming response. Tags can be the same across rules.

Lets focus on the first argument. Each operator does something unique:

  • The first is the single lang:en which is BCP 47 language identifier. This filters the stream for tweets posted in the English language. You can only use a single lang operator in a rule, and it must be used with a conjunction.

  • Then we exclude retweets with -is:retweet. We use NOT logic (negation) by including a minus sign in front of our operator. The negation can be applied to words too. For example, cat #meme -grumpy will match tweets with the word cat, #meme, and do not include the word grumpy.

  • We also exclude quote tweets with -is:quote. Quote tweets are tweets with comments, and Ive found this operator very useful. When I was building findtechjobs.io, I encountered a lot of people retweeting an article about automated hiring with their opinion. These quote tweets cluttered my dataset with unrelated job postings.

  • I then narrow my stream of tweets to words that include hiring. People who tweet about jobs would say My team is hiring, or StartupCo is hiring.

  • Finally (software developer OR software engineer), is a grouping of operators combined with an OR logic. Tweets will match if the tweet contains either of these words.

After we build our rules, we create them with api.Rules.Create. If you want to delete your rules, you can use api.Rules.Delete with the ID of each rule you currently have. You can find your current rules with api.Rules.Get.

You can learn more about rule operators here. Additionally, the endpoint that creates the rules is documented here.

Set the Unmarshal Hook

We need to create our own struct for our tweets so we can unmarshal the tweet well. Twitters Filtered Stream endpoint allows us to fetch additional information for each tweet (more on this later). To allow us to find this data easily, we need to create a struct that will represent our data model.

type StreamDataExample struct {    Data struct {        Text      string    `json:"text"`        ID        string    `json:"id"`        CreatedAt time.Time `json:"created_at"`        AuthorID  string    `json:"author_id"`    } `json:"data"`    Includes struct {        Users []struct {        ID       string `json:"id"`        Name     string `json:"name"`        Username string `json:"username"`        } `json:"users"`    } `json:"includes"`    MatchingRules []struct {        ID  string `json:"id"`        Tag string `json:"tag"`    } `json:"matching_rules"`}

Every tweet that is streamed is returned as a []bytes by default. We can turn our data into something usable by unmarshaling each tweet into the struct StreamDataExample. Its important to set an unmarshal hook with SetUnmarshalHook so we can process []bytes in a goroutine safe way.

api.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {    data := StreamDataExample{}    if err := json.Unmarshal(bytes, &data); err != nil {        fmt.Printf("failed to unmarshal bytes: %v", err)    }    return data, err})

If you are uncertain what your data model will look like, you can always create a string from the slice of bytes.

api.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {    return string(bytes), nil})

Starting a Stream

After creating our streaming rules and unmarshal hook, we are ready to start streaming tweets.
By default, twitter returns a limited amount of information about each tweet when we stream. We can request additional information on each tweet with a stream expansion.

    streamExpansions := twitterstream.NewStreamQueryParamsBuilder().        AddExpansion("author_id").        AddTweetField("created_at").        Build()    // StartStream will start the stream    err = api.StartStream(streamExpansions)

We first create some stream expansions with a NewStreamQueryParamsBuilder. This builder will create query parameters to start our stream with. Here, we are adding two additional piece of information to each tweet

  • AddExpansion("author_id") will request the authors id for each tweet streamed. This is useful if you are keeping track of users who are tweeting.
  • AddTweetField("created_at") will request the time the tweet was tweeted. This is useful if you need to sort tweets chronologically.You can learn more about the available stream expansions here

Then we start the stream with our expansions using api.StartStream. This method will start a long running GET request to twitters streaming endpoint. The request is parsed incrementally throughout the duration of the network request. If you are interested in learning more about how to consume streaming data from twitter, then you should read their documentation Consuming Streaming Data

Consuming the Stream

Each tweet that is processed in our long running GET request is sent to a go channel. We range over this channel to process each tweet and check for errors from twitter. The stream will stop when we invoke api.StopStream, then we skip the remaining part of the loop, return to the top and wait for aclose signal from the channel.

   // Start processing data from twitter after starting the stream    for tweet := range api.GetMessages() {        // Handle disconnections from twitter        if tweet.Err != nil {            fmt.Printf("got error from twitter: %v", tweet.Err)            // Stop the stream and wait for the channel to close on the next iteration.            api.StopStream()            continue        }        result := tweet.Data.(StreamDataExample)        // Here I am printing out the text.        // You can send this off to a queue for processing.        // Or do your processing here in the loop        fmt.Println(result.Data.Text)    }

Twitters servers attempt to hold the stream connection indefinitely. The error from twitter is made available in the stream. Disconnections can occur from several possible reasons:

  • A streaming server is restarted on the Twitter side. This is usually related to a code deploy and should be generally expected and designed around.
  • Your account exceeded your daily/monthly quota of Tweets.
  • You have too many active redundant connections.
  • More disconnect reasons can be found here

Anticipating Disconnects from Twitter

Its important to maintain the connection to Twitter as long as possible because missing relevant information in your stream can create poor datasets. It should be expected that disconnections will occur and reconnection logic be built to handle disconnections from twitter

We can build reconnection logic using twitterstreams api and a defer statement. A full example of handling reconnects can be found here. Below is a snippet

// This will run foreverfunc initiateStream() {    fmt.Println("Starting Stream")    // Start the stream    // And return the library's api    api := fetchTweets()    // When the loop below ends, restart the stream defer initiateStream()    defer initateStream()    // Start processing data from twitter    for tweet := range api.GetMessages() {        if tweet.Err != nil {            fmt.Printf("got error from twitter: %v", tweet.Err)            api.StopStream()            continue        }        result := tweet.Data.(StreamDataExample)        fmt.Println(result.Data.Text)    }    fmt.Println("Stopped Stream")}

After we have started the stream and before we start processing the tweets, we defer the method itself. This will handle reconnections to twitter whenever the messages channel closes.

Final Thoughts

I hope you find this library useful in streaming tweets from twitter. Building this library was a challenge, and I learned how Gos concurrency model works. If you liked this post, follow me on twitter as I document my journey in the software world.


Original Link: https://dev.to/fallenstedt/streaming-tweets-with-go-92p

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To