Skip to content
Please note that GitHub no longer supports your web browser.

We recommend upgrading to the latest Google Chrome or Firefox.

Learn more
Permalink
Browse files

Add new tweet data extraction utilities.

- Add helper method for checking if column exists
- Add text extraction
- Add tests
- Remove URL extraction until we sort how how we want to do it
- Bit of scalastyle clean-up
- Update README example
  • Loading branch information
ruebot committed Dec 3, 2019
1 parent 416ccc0 commit 3761157aee9e1db99a1aa78ec3e7ae830d8d7e0d
@@ -3,7 +3,6 @@
[![Build Status](https://travis-ci.org/archivesunleashed/twut.svg?branch=master)](https://travis-ci.org/archivesunleashed/twut)
[![codecov](https://codecov.io/gh/archivesunleashed/twut/branch/master/graph/badge.svg)](https://codecov.io/gh/archivesunleashed/twut)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.archivesunleashed/twut/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.archivesunleashed/twut)
[![Javadoc](https://javadoc-badge.appspot.com/io.archivesunleashed/twut.svg?label=javadoc)](http://api.docs.archivesunleashed.io/0.18.0/apidocs/index.html)
[![Scaladoc](https://javadoc-badge.appspot.com/io.archivesunleashed/twut.svg?label=scaladoc)](http://api.docs.archivesunleashed.io/0.18.0/scaladocs/index.html)
[![LICENSE](https://img.shields.io/badge/license-Apache-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0)
[![Contribution Guidelines](http://img.shields.io/badge/CONTRIBUTING-Guidelines-blue.svg)](./CONTRIBUTING.md)
@@ -46,8 +45,8 @@ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import io.archivesunleashed.twut._
import io.archivesunleashed.twut._
scala> import io.archivesunleashed._
import io.archivesunleashed._
scala> val tweets = "/home/nruest/Projects/au/twut/src/test/resources/10-sample.jsonl"
tweets: String = /home/nruest/Projects/au/twut/src/test/resources/10-sample.jsonl
@@ -56,7 +55,7 @@ scala> val tweetsDF = spark.read.json(tweets)
19/12/02 13:38:51 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
tweetsDF: org.apache.spark.sql.DataFrame = [contributors: string, coordinates: string ... 33 more fields]
scala> twut.ids(tweetsDF).show
scala> ids(tweetsDF).show
+-------------------+
| id_str|
+-------------------+
@@ -9,7 +9,7 @@
<check class="org.scalastyle.file.HeaderMatchesChecker" level="warning" enabled="true">
<parameters>
<parameter name="header"><![CDATA[/*
* Copyright © 2017 The Archives Unleashed Project
* Copyright © 2019 The Archives Unleashed Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,7 @@ import org.apache.spark.sql.DataFrame

class TwutPy(sc: SparkContext) {

/** Creates a DataFrame of Tweet IDs. **/
/** Creates a DataFrame of Tweet IDs. */
def ids(tweets: DataFrame): DataFrame = {
ids(tweets)
}
@@ -31,8 +31,13 @@ class TwutPy(sc: SparkContext) {
userInfo(tweets)
}

/** Creates a DataFame of tweeted urls. */
def urls(tweets: DataFrame): DataFrame = {
urls(tweets)
/** Creates a DataFame of tweet text. */
def text(tweets: DataFrame): DataFrame = {
text(tweets)
}

/** Creates a DataFrame of tweet times. */
def times(tweets: DataFrame): DataFrame = {
times(tweets)
}
}
@@ -16,13 +16,22 @@

package io

import scala.util.Try
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame

/**
* Package object which supplies implicits to augment generic DataFrames with twut-specific transformations.
*/
package object archivesunleashed {

/** Checks to see if a column exists.
*
* @param tweets DataFrame of line-oriented Twitter JSON
* @return boolean.
*/
def hasColumn(tweets: DataFrame, column: String) = Try(tweets(column)).isSuccess

/** Creates a DataFrame of Tweet IDs.
*
* @param tweets DataFrame of line-oriented Twitter JSON
@@ -53,15 +62,32 @@ package object archivesunleashed {
)
}

/** Creates a DataFame of tweeted urls.
/** Creates a DataFrame of Tweet text.
*
* @param tweets DataFrame of line-oriented Twitter JSON
* @return a two columns (text, and full-text) containing Tweet text.
*/
def text(tweets: DataFrame): DataFrame = {
if (hasColumn(tweets, "full_text") == true) {
tweets.select(
"full_text",
"text"
)
} else {
tweets.select(
"text"
)
}
}

/** Creates a DataFrame of Tweet times.
*
* @param tweets DataFrame of line-oriented Twitter JSON
* @return a multi-column DataFrame containing urls.
* @return a single-column DataFrame containing UTC Tweet times.
*/
def urls(tweets: DataFrame): DataFrame = {
def times(tweets: DataFrame): DataFrame = {
tweets.select(
"entities.urls.expanded_url",
"entities.urls.url"
"created_at"
)
}
}
@@ -39,6 +39,20 @@ class TwutTest extends FunSuite with BeforeAndAfter {
sc = new SparkContext(conf)
}

test("Column check") {
val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._
// scalastyle:on
val tweetsDF = spark.read.json(tweets)

val hasFullText = hasColumn(tweetsDF, "full_text")
val hasText = hasColumn(tweetsDF, "text")

assert(hasFullText == false)
assert(hasText == true)
}

test("ID Extraction") {
val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
@@ -74,9 +88,44 @@ class TwutTest extends FunSuite with BeforeAndAfter {
assert("イサオ(^^)最近ディスクにハマル🎵" == userInfoTest(0).get(5))
assert("isao777sp2" == userInfoTest(0).get(6))
assert(2137 == userInfoTest(0).get(7))
// scalastyle:off
assert(false == userInfoTest(0).get(8))
// scalastyle:on
}

test("Text Extraction") {
val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._
// scalastyle:on
val tweetsDF = spark.read.json(tweets)

val textTest = text(tweetsDF)
.head(3)
assert(textTest.size == 3)
assert("Baket ang pogi mo???" == textTest(0).get(0))
assert("今日すげぇな!#安元江口と夜あそび" == textTest(1).get(0))
assert("@flower_1901 عسى الله يوفقنا 🙏🏻" == textTest(2).get(0))
}

test("Times Extraction") {
val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._
// scalastyle:on
val tweetsDF = spark.read.json(tweets)

val timesTest = times(tweetsDF)
.head(5)
assert(timesTest.size == 5)
assert("Mon Dec 02 14:16:05 +0000 2019" == timesTest(0).get(0))
assert("Mon Dec 02 14:16:05 +0000 2019" == timesTest(1).get(0))
assert("Mon Dec 02 14:16:05 +0000 2019" == timesTest(2).get(0))
assert("Mon Dec 02 14:16:05 +0000 2019" == timesTest(3).get(0))
assert("Mon Dec 02 14:16:05 +0000 2019" == timesTest(4).get(0))
}


after {
if (sc != null) {
sc.stop()

0 comments on commit 3761157

Please sign in to comment.
You can’t perform that action at this time.