Skip to content
Please note that GitHub no longer supports your web browser.

We recommend upgrading to the latest Google Chrome or Firefox.

Learn more
Permalink
Browse files

Setup PySpark loader, and reorganize.

- remove lots of pom cruft
  • Loading branch information
ruebot committed Dec 3, 2019
1 parent 79b747c commit 416ccc01bda7c8a2dce23470df9cf32f1bbca0c0
@@ -18,3 +18,5 @@ src/main/python/tf/model.zip
src/main/python/tf/util/spark.conf
src/main/python/tf/model/graph/
src/main/python/tf/model/category/
.ipynb_checkpoints/
*.ipynb
27 pom.xml
@@ -43,8 +43,6 @@
<surefire.plugin.version>2.22.0</surefire.plugin.version>
<jacoco.plugin.version>0.8.4</jacoco.plugin.version>
<versions.plugin.version>2.1</versions.plugin.version>
<tika.version>1.22</tika.version>
<jackson.version>2.10.0</jackson.version>
</properties>

<licenses>
@@ -480,31 +478,6 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.7.3</version>
</dependency>
<dependency>
<groupId>org.rogach</groupId>
<artifactId>scallop_${scala.binary.version}</artifactId>
<version>3.3.1</version>
</dependency>
<dependency> <!-- Needed for running boilerpipe, but will compile without. -->
<groupId>com.syncthemall</groupId>
<artifactId>boilerpipe</artifactId>
<version>1.2.2</version>
</dependency>
<dependency> <!-- Needed for running boilerpipe. -->
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>tl.lin</groupId>
<artifactId>lintools-datatypes</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>

<developers>
@@ -0,0 +1,3 @@
from twut.common import Tweets

__all__ = ["Tweets"]
@@ -0,0 +1,14 @@
from pyspark.sql import DataFrame

class Tweets:
def __init__(self, sc, sqlContext, tweets):
self.sc = sc
self.sqlContext = sqlContext
self.loader = sc._jvm.io.archivesunleashed.TwutPy(sc._jsc.sc())
self.tweets = tweets

def ids(self):
return DataFrame(self.loader.ids(self.tweets), self.sqlContext)

def userInfo(self):
return DataFrame(self.loader.userInfo(self.tweets), self.sqlContext)
@@ -0,0 +1,38 @@
/*
* Copyright © 2019 The Archives Unleashed Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.archivesunleashed

import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame

class TwutPy(sc: SparkContext) {

/** Creates a DataFrame of Tweet IDs. **/
def ids(tweets: DataFrame): DataFrame = {
ids(tweets)
}

/** Creates a DataFrame of Twitter User Info. */
def userInfo(tweets: DataFrame): DataFrame = {
userInfo(tweets)
}

/** Creates a DataFame of tweeted urls. */
def urls(tweets: DataFrame): DataFrame = {
urls(tweets)
}
}
@@ -14,15 +14,15 @@
* limitations under the License.
*/

package io.archivesunleashed.twut
package io

import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame

/**
* Package object which supplies implicits to augment generic DataFrames with twut-specific transformations.
*/
package object twut {

package object archivesunleashed {
/** Creates a DataFrame of Tweet IDs.
*
* @param tweets DataFrame of line-oriented Twitter JSON
@@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.archivesunleashed.twut
package io.archivesunleashed

import com.google.common.io.Resources
import org.apache.spark.sql.SparkSession
@@ -46,14 +46,13 @@ class TwutTest extends FunSuite with BeforeAndAfter {
// scalastyle:on
val tweetsDF = spark.read.json(tweets)

val ids = twut
.ids(tweetsDF)
val idsTest = ids(tweetsDF)
.orderBy(desc("id_str"))
.head(3)
assert(ids.size == 3)
assert("1201505319286984705" == ids(0).get(0))
assert("1201505319286755328" == ids(1).get(0))
assert("1201505319282565121" == ids(2).get(0))
assert(idsTest.size == 3)
assert("1201505319286984705" == idsTest(0).get(0))
assert("1201505319286755328" == idsTest(1).get(0))
assert("1201505319282565121" == idsTest(2).get(0))
}

test("User Info Extraction") {
@@ -63,20 +62,19 @@ class TwutTest extends FunSuite with BeforeAndAfter {
// scalastyle:on
val tweetsDF = spark.read.json(tweets)

val userInfo = twut
.userInfo(tweetsDF)
val userInfoTest = userInfo(tweetsDF)
.orderBy(desc("id_str"))
.head(1)
assert(userInfo.size == 1)
assert(2331 == userInfo(0).get(0))
assert(91 == userInfo(0).get(1))
assert(83 == userInfo(0).get(2))
assert("973424490934714368" == userInfo(0).get(3))
assert("日本 山口" == userInfo(0).get(4))
assert("イサオ(^^)最近ディスクにハマル🎵" == userInfo(0).get(5))
assert("isao777sp2" == userInfo(0).get(6))
assert(2137 == userInfo(0).get(7))
assert(false == userInfo(0).get(8))
assert(userInfoTest.size == 1)
assert(2331 == userInfoTest(0).get(0))
assert(91 == userInfoTest(0).get(1))
assert(83 == userInfoTest(0).get(2))
assert("973424490934714368" == userInfoTest(0).get(3))
assert("日本 山口" == userInfoTest(0).get(4))
assert("イサオ(^^)最近ディスクにハマル🎵" == userInfoTest(0).get(5))
assert("isao777sp2" == userInfoTest(0).get(6))
assert(2137 == userInfoTest(0).get(7))
assert(false == userInfoTest(0).get(8))
}

after {

0 comments on commit 416ccc0

Please sign in to comment.
You can’t perform that action at this time.