# Example: WordCount

In this example, we'll implement a simple pipeline which calculates word frequencies in a plain text document. We'll use the smart constructors pureFlow and ioFlow, allowing definition of pipeline's tasks in terms of Haskell functions.

This example may look familiar to users of Apache Beam, which also includes a WordCount example https://beam.apache.org/get-started/wordcount-example/.

## Imports

First, we'll need to import some additional modules which will enable us to more easily work with text (Data.Text), define dictionaries/maps (Data.Map), perform regex matching (Text.Regex.Posix), and more.

In [1]:
:opt no-lint

{-# LANGUAGE Arrows #-}
{-# LANGUAGE OverloadedStrings #-}

import Data.List (sortBy)
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Ord (comparing)
import qualified Data.Text.IO as T
import qualified Data.Text as T
import Text.Printf (printf)
import Text.Regex.Posix ((=~))

import Funflow

## Helper Functions

Since we're opting to write our pipeline using function-based Flows, we need to define the functions which will take care of parsing input text and counting words.

In [2]:
-- | (word, n_occurences)
type TextCount = (T.Text, Int)

-- | Counts members in a list of text. Also works for lazy lists (e.g. data from readFile)
countWords :: [T.Text] -> [TextCount]
countWords ws = let
    tally :: T.Text -> Map T.Text Int -> Map T.Text Int
    tally k = Map.insertWith (+) k 1
    in 
        Map.toList $ foldr tally Map.empty ws

-- | Removes punctuation marks from a text
removePunctuation :: T.Text -> T.Text
removePunctuation = 
    let
        punctuation = ",.?!:;\"\'" :: String
      in T.filter (not . (`elem` punctuation))


-- | Filters words which are not comprised of latin characters (hyphens are allowed)
filterWords :: [T.Text] -> [T.Text]
filterWords = 
    let 
        wordsRegex = "^[A-Za-z-]+$" :: String
      in filter $ (=~ wordsRegex) . T.unpack

-- | Sorts a list of word counts in descending order
sortCountsDesc :: [TextCount] -> [TextCount]
sortCountsDesc = sortBy (flip $ comparing snd)

-- | Like countWords, but with sorted results
countWordsSortedDesc :: [T.Text] -> [TextCount]
countWordsSortedDesc = sortCountsDesc . countWords

-- | Prepare word counts for printing
formatCounts :: [TextCount] -> [T.Text]
formatCounts = map (\(w,c) -> T.pack $ printf "%s: %d" (T.unpack w) c)

## Pipeline Definition

With our core utility functions defined, we're ready to define our pipeline. One simple way to structure our pipeline is to divide it into three tasks/operations:

  1. Read the input text file
  2. Parse the input text and return a summary of the word frequencies in it
  3. Write out our results. For this example, we can just write them directly to the terminal.

Remember that in funflow, we create flows, each containing at least one task, and combine them into a final, larger Flow DAG. This is different than how DAGs are constructed in other workflow frameworks like Apache Airflow, where the tasks are what get composed together. The advantage of making entire Flow DAGs composable is that you can re-use entire subsections of your workflow, instead of individual tasks. In other words, the recursive nature of Flow automatically scales reusability.

For example, say that you write a Flow which contains some complex branching logic for reporting errors based on its input values (e.g. send a Slack message to the dev team if an upstream task reports an error). With funflow, you can simply re-use that error-reporting Flow across all of your various workflows.

In [3]:
-- Individual task definitions (remember that each task is also a full "Flow")
readDocument :: Flow String T.Text
readDocument = ioFlow T.readFile

countWordsAndSummarize :: Flow T.Text T.Text
countWordsAndSummarize = pureFlow $ (T.unlines . formatCounts . countWordsSortedDesc . filterWords . T.words . removePunctuation)

writeResult :: Flow (String, T.Text) ()
writeResult = let
        writeOutputMessage :: (String, T.Text) -> IO ()
        writeOutputMessage (f, countText) = do
            T.putStrLn "Normally we would write the result to a file with T.writeFile, but for this example we can instead print the output:"
            T.putStrLn countText 
            return ()
    in ioFlow writeOutputMessage

-- Build the final pipeline using the task Flows defined above
--   Note: Using arrow syntax to control which inputs get passed to
--   which pipeline tasks, i.e. result_name <- task <- task_input
flow :: Flow (String, String) ()
flow = proc (documentFilePath, outputSummaryFilePath) -> do
    documentText <- readDocument -< documentFilePath
    countSummary <- countWordsAndSummarize -< documentText
    writeResult -< (outputSummaryFilePath, countSummary)

## Run the pipeline

And finally, with our pipeline defined, we're ready to run it!

In [4]:
runFlow flow ("words.txt":: String, "outputs/counts.txt"::String) :: IO ()
Normally we would write the result to a file with T.writeFile, but for this example we can instead print the output:
a: 3
and: 3
it: 3
try: 3
words: 3
Lets: 2
This: 2
count: 2
give: 2
or: 2
pipeline: 2
should: 2
FILE: 1
WordCounths: 1
accept: 1
basic: 1
but: 1
complex-words: 1
contains: 1
haskell: 1
like: 1
list: 1
long: 1
not: 1
numbers: 1
of: 1
only: 1
punctuation: 1
the: 1
to: 1
using: 1