# Example: WordCount
In this example, we'll implement a simple pipeline which
calculates word frequencies in a plain text document.
We'll use the smart constructors pureFlow
and ioFlow
, allowing
definition of pipeline's tasks in terms of Haskell functions.
This example may look familiar to users of Apache Beam, which also includes a WordCount example https://beam.apache.org/get-started/wordcount-example/.
## Imports
First, we'll need to import some additional modules which will enable us to
more easily work with text (Data.Text
), define dictionaries/maps (Data.Map
), perform
regex matching (Text.Regex.Posix
), and more.
:opt no-lint
{-# LANGUAGE Arrows #-}
{-# LANGUAGE OverloadedStrings #-}
import Data.List (sortBy)
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Ord (comparing)
import qualified Data.Text.IO as T
import qualified Data.Text as T
import Text.Printf (printf)
import Text.Regex.Posix ((=~))
import Funflow
## Helper Functions
Since we're opting to write our pipeline using function-based Flow
s, we need to define the
functions which will take care of parsing input text and counting words.
-- | (word, n_occurences)
type TextCount = (T.Text, Int)
-- | Counts members in a list of text. Also works for lazy lists (e.g. data from readFile)
countWords :: [T.Text] -> [TextCount]
countWords ws = let
tally :: T.Text -> Map T.Text Int -> Map T.Text Int
tally k = Map.insertWith (+) k 1
in
Map.toList $ foldr tally Map.empty ws
-- | Removes punctuation marks from a text
removePunctuation :: T.Text -> T.Text
removePunctuation =
let
punctuation = ",.?!:;\"\'" :: String
in T.filter (not . (`elem` punctuation))
-- | Filters words which are not comprised of latin characters (hyphens are allowed)
filterWords :: [T.Text] -> [T.Text]
filterWords =
let
wordsRegex = "^[A-Za-z-]+$" :: String
in filter $ (=~ wordsRegex) . T.unpack
-- | Sorts a list of word counts in descending order
sortCountsDesc :: [TextCount] -> [TextCount]
sortCountsDesc = sortBy (flip $ comparing snd)
-- | Like countWords, but with sorted results
countWordsSortedDesc :: [T.Text] -> [TextCount]
countWordsSortedDesc = sortCountsDesc . countWords
-- | Prepare word counts for printing
formatCounts :: [TextCount] -> [T.Text]
formatCounts = map (\(w,c) -> T.pack $ printf "%s: %d" (T.unpack w) c)
## Pipeline Definition
With our core utility functions defined, we're ready to define our pipeline. One simple way to structure our pipeline is to divide it into three tasks/operations:
Remember that in funflow
, we create flows, each containing at least one task, and combine them
into a final, larger Flow
DAG. This is different than how DAGs are constructed in other workflow
frameworks like Apache Airflow, where the tasks are what get composed together. The advantage
of making entire Flow
DAGs composable is that you can re-use entire subsections of your workflow,
instead of individual tasks. In other words, the recursive nature of Flow
automatically scales reusability.
For example, say that you write a Flow
which contains some complex branching logic for reporting errors
based on its input values (e.g. send a Slack message to the dev team if an upstream task reports an error).
With funflow
, you can simply re-use that error-reporting Flow
across all of your various workflows.
-- Individual task definitions (remember that each task is also a full "Flow")
readDocument :: Flow String T.Text
readDocument = ioFlow T.readFile
countWordsAndSummarize :: Flow T.Text T.Text
countWordsAndSummarize = pureFlow $ (T.unlines . formatCounts . countWordsSortedDesc . filterWords . T.words . removePunctuation)
writeResult :: Flow (String, T.Text) ()
writeResult = let
writeOutputMessage :: (String, T.Text) -> IO ()
writeOutputMessage (f, countText) = do
T.putStrLn "Normally we would write the result to a file with T.writeFile, but for this example we can instead print the output:"
T.putStrLn countText
return ()
in ioFlow writeOutputMessage
-- Build the final pipeline using the task Flows defined above
-- Note: Using arrow syntax to control which inputs get passed to
-- which pipeline tasks, i.e. result_name <- task <- task_input
flow :: Flow (String, String) ()
flow = proc (documentFilePath, outputSummaryFilePath) -> do
documentText <- readDocument -< documentFilePath
countSummary <- countWordsAndSummarize -< documentText
writeResult -< (outputSummaryFilePath, countSummary)
## Run the pipeline
And finally, with our pipeline defined, we're ready to run it!
runFlow flow ("words.txt":: String, "outputs/counts.txt"::String) :: IO ()
Normally we would write the result to a file with T.writeFile, but for this example we can instead print the output: a: 3 and: 3 it: 3 try: 3 words: 3 Lets: 2 This: 2 count: 2 give: 2 or: 2 pipeline: 2 should: 2 FILE: 1 WordCounths: 1 accept: 1 basic: 1 but: 1 complex-words: 1 contains: 1 haskell: 1 like: 1 list: 1 long: 1 not: 1 numbers: 1 of: 1 only: 1 punctuation: 1 the: 1 to: 1 using: 1