All communication within Apache projects is happening on or is mirrored to mailing lists. Most of these lists are public and archived. The Mail Data Set is derived from the public archives of Apache Flink’s developer and issues mailing lists (dev@flink.apache.org and issues@flink.apache.org).

1. Download the Mail Data Set

Download and extract the Mail Data Set by running the following commands

wget https://raw.githubusercontent.com/dataArtisans/eit-summerschool-15-exercises/master/src/main/resources/dev-flink.apache.org.archive

The file contains the Mail Data Set.

Data format of the Mail Data Set

The Mail Data Set is formatted in a delimited text format. Email records are separated by a newline. Each mail record has five fields:

MessageID  : String // a unique message id
Timestamp  : String // the mail deamon timestamp
Sender     : String // the sender of the mail
Subject    : String // the subject of the mail
Replied-To : String // the messageID of the mail this mail was replied to 
                    //   (may be “null”)

which are separated by a “|” char sequence.

Hence, the format of the file is

<MessageId>|<Timestamp>|<Sender>|<Subject>|<RepliedToMessageId>
<MessageId>|TimeStamp>|...

The Mail Data Set can be read using Flink’s CsvInputFormat:

Java

// get an ExecutionEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read all fields
DataSet<Tuple5<String, String, String, String, String>> mails =
  env.readCsvFile("/your/output/mail-data")
    .lineDelimiter("\n")
    .fieldDelimiter("|")
    .types(String.class, String.class, String.class,
           String.class, String.class);

// read sender and body fields
DataSet<Tuple2<String, String>> senderBody =
  env.readCsvFile("/your/output/mail-data")
    .lineDelimiter("\n")
    .fieldDelimiter("|")
    .includeFields("00101")
    .types(String.class, String.class);

Scala

// get an ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment

// read all fields
val mails = env.readCsvFile[(String, String, String, String, String)](
    "/your/output/mail-data",
    lineDelimiter = "\n",
    fieldDelimiter = "|",
  )

// read sender and body fields
val senderBody = env.readCsvFile[(String, String)](
    "/your/output/mail-data",
    lineDelimiter = "\n",
    fieldDelimiter = "|",
    includedFields = Array(2,4)
  )