Mail Data Set
All communication within Apache projects is happening on or is mirrored to mailing lists. Most of these lists are public and archived. The Mail Data Set is derived from the public archives of Apache Flink’s developer and issues mailing lists (dev@flink.apache.org and issues@flink.apache.org).
1. Download the Mail Data Set
Download and extract the Mail Data Set by running the following commands
wget https://raw.githubusercontent.com/dataArtisans/eit-summerschool-15-exercises/master/src/main/resources/dev-flink.apache.org.archive
The file contains the Mail Data Set.
Data format of the Mail Data Set
The Mail Data Set is formatted in a delimited text format. Email records are separated by a newline. Each mail record has five fields:
MessageID : String // a unique message id
Timestamp : String // the mail deamon timestamp
Sender : String // the sender of the mail
Subject : String // the subject of the mail
Replied-To : String // the messageID of the mail this mail was replied to
// (may be “null”)
which are separated by a “|
” char sequence.
Hence, the format of the file is
<MessageId>|<Timestamp>|<Sender>|<Subject>|<RepliedToMessageId>
<MessageId>|TimeStamp>|...
2. Read the Mail Data Set in a Flink program
The Mail Data Set can be read using Flink’s CsvInputFormat
:
Java
// get an ExecutionEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read all fields
DataSet<Tuple5<String, String, String, String, String>> mails =
env.readCsvFile("/your/output/mail-data")
.lineDelimiter("\n")
.fieldDelimiter("|")
.types(String.class, String.class, String.class,
String.class, String.class);
// read sender and body fields
DataSet<Tuple2<String, String>> senderBody =
env.readCsvFile("/your/output/mail-data")
.lineDelimiter("\n")
.fieldDelimiter("|")
.includeFields("00101")
.types(String.class, String.class);
Scala
// get an ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
// read all fields
val mails = env.readCsvFile[(String, String, String, String, String)](
"/your/output/mail-data",
lineDelimiter = "\n",
fieldDelimiter = "|",
)
// read sender and body fields
val senderBody = env.readCsvFile[(String, String)](
"/your/output/mail-data",
lineDelimiter = "\n",
fieldDelimiter = "|",
includedFields = Array(2,4)
)