summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYvesPares <>2019-10-09 10:16:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-10-09 10:16:00 (GMT)
commit6040ebb617d31f488a816989f9e1f4435137115a (patch)
tree656c279769001d7487ae9a1e2a5aff2ae50b3877
version 0.1.0.0HEAD0.1.0.0master
-rw-r--r--LICENSE21
-rw-r--r--examples/example-Poke/ExamplePokeAPI.hs112
-rw-r--r--examples/example-Stock/ExampleStockAPI.hs127
-rw-r--r--porcupine-http.cabal118
-rw-r--r--src/Data/Locations/Accessors/HTTP.hs159
5 files changed, 537 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..2b55c2c
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Tweag I/O, NovaDiscovery
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/examples/example-Poke/ExamplePokeAPI.hs b/examples/example-Poke/ExamplePokeAPI.hs
new file mode 100644
index 0000000..fb6b1f3
--- /dev/null
+++ b/examples/example-Poke/ExamplePokeAPI.hs
@@ -0,0 +1,112 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE DuplicateRecordFields #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeApplications #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE Arrows #-}
+
+-- Don't forget to map locations to http urls in the 'exampleHTTP.yaml'
+-- generated by calling 'exampleHTTP write-config-template'.
+
+import Control.Monad
+import Data.Aeson
+import Data.DocRecord
+import qualified Data.Text as T
+import GHC.Generics
+import Porcupine
+import Prelude hiding (id, (.))
+import Graphics.Vega.VegaLite as VL
+
+import Data.Locations.Accessors.HTTP
+
+
+-- | The type of our raw data, read from the REST API
+data Pokemon = Pokemon { pkName :: !T.Text
+ , pkMoves :: ![T.Text]
+ , pkTypes :: ![T.Text] }
+ deriving (Generic, Store) -- Store makes them cacheable
+
+instance FromJSON Pokemon where
+ parseJSON = withObject "Pokemon" $ \o -> Pokemon
+ <$> o .: "name"
+ <*> (o .: "moves" >>= mapM ((.: "move") >=> (.: "name")))
+ <*> (o .: "types" >>= mapM ((.: "type") >=> (.: "name")))
+
+-- | How to load pokemons
+--
+-- See https://pokeapi.co/api/v2/pokemon/25 for instance
+pokemonFile :: DataSource Pokemon
+pokemonFile = usesCacherWithIdent 12345 $ -- This tells we want to support
+ -- caching of the fetched data
+ clockVFileAccesses $ -- This tells we want to add info about time
+ -- taken to read data
+ dataSource ["Inputs", "Pokemon"]
+ (somePureDeserial JSONSerial)
+
+-- | One over-simple intermediary result type
+newtype Analysis = Analysis { moveCount :: Int }
+ deriving (Generic, ToJSON)
+
+-- | How to write analysis
+analysisFile :: DataSink Analysis
+analysisFile = dataSink ["Outputs", "Analysis"]
+ (somePureSerial JSONSerial)
+
+-- | Where to write the final summary visualization
+vlSummarySink :: DataSink VegaLite
+vlSummarySink = dataSink ["Outputs", "Summary"]
+ (lmap VL.toHtml (somePureSerial $ PlainTextSerial $ Just "html")
+ <>
+ lmap VL.fromVL (somePureSerial JSONSerial))
+
+-- | Create the vega-lite specification of the visualization we want
+writeSummary :: (LogThrow m) => PTask m [Pokemon] ()
+writeSummary = proc pkmn -> do
+ let dat = dataFromColumns []
+ . dataColumn "name" (Strings $ map pkName pkmn)
+ . dataColumn "numMoves" (Numbers $ map (fromIntegral . length . pkMoves) pkmn)
+
+ enc = encoding
+ . position X [ PName "name", PmType Nominal ]
+ . position Y [ PName "numMoves", PmType Quantitative ] -- , PAggregate Mean ]
+
+ spec = toVegaLite [ dat [], mark Bar [], enc [] ]
+ writeData vlSummarySink -< spec
+
+-- | The task combining the three previous operations.
+--
+-- This task may look very opaque from the outside, having no parameters and no
+-- return value. But we will be able to ppreuse it over different users without
+-- having to change it at all.
+analyzeOnePokemon :: (LogThrow m) => PTask m a Pokemon
+analyzeOnePokemon =
+ loadData pokemonFile >>> (arr analyzePokemon >>> writeData analysisFile) &&& id >>> arr snd
+ where
+ analyzePokemon = Analysis . length . pkMoves -- Just count number of moves
+
+mainTask :: (LogThrow m) => PTask m () ()
+mainTask =
+ -- First we get the ids of the users that we want to analyse. We need only one
+ -- field that will contain a range of values, see IndexRange. By default, this
+ -- range contains just one value, zero.
+ getOption ["Settings"] (docField @"pokemonIds" (oneIndex (1::Int)) "The indices of the pokemon to load")
+ -- We turn the range we read into a full lazy list:
+ >>> arr enumTRIndices
+ -- Then we just map over these ids and call analyseOnePokemon each time:
+ >>> parMapTask "pokemonId" analyzeOnePokemon
+ >>> writeSummary
+
+main :: IO ()
+main = runPipelineTask (FullConfig "example-pokeapi"
+ "porcupine-http/examples/example-Poke/example-pokeapi.yaml"
+ "example-pokeapi_files"
+ ())
+ ( #http <-- useHTTP
+ -- We just add #http on top of the baseContexts.
+ :& baseContexts "")
+ mainTask ()
diff --git a/examples/example-Stock/ExampleStockAPI.hs b/examples/example-Stock/ExampleStockAPI.hs
new file mode 100644
index 0000000..41022a4
--- /dev/null
+++ b/examples/example-Stock/ExampleStockAPI.hs
@@ -0,0 +1,127 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE DuplicateRecordFields #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeApplications #-}
+
+import Data.Aeson
+import Data.DocRecord
+import qualified Data.Text as T
+import GHC.Generics
+import Porcupine
+
+import Prelude hiding (id, (.))
+
+import Data.Locations.Accessors.HTTP
+import Graphics.Vega.VegaLite
+
+
+
+
+data StockDaily = StockDaily { date :: String , close :: Double} deriving (Generic)
+instance FromJSON StockDaily
+instance ToJSON StockDaily
+
+
+data Stock = Stock { chart :: [StockDaily] } deriving (Generic)
+instance FromJSON Stock
+instance ToJSON Stock
+
+
+getCloseStock :: Stock -> [Double]
+getCloseStock s = map close (chart s)
+
+getDateStock :: Stock -> [String]
+getDateStock s = map date (chart s)
+
+-- | How to load Stock prices
+stockFile :: DataSource Stock
+stockFile = dataSource ["Inputs", "Stock"]
+ (somePureDeserial JSONSerial)
+
+-- As an example, we have donwloaded the Apple stock information from
+-- https://api.iextrading.com/1.0/stock/aapl/batch?types=chart&range=1y
+-- you can replace "aapl" by NASDAQ of any other company.
+
+
+stockToVegaLite :: Stock -> VLSpec
+stockToVegaLite stock =
+ let dat = dataFromColumns []
+ . dataColumn "Date" (Strings (map T.pack (getDateStock stock) ) )
+ . dataColumn "Price" (Numbers (getCloseStock stock) )
+ enc = encoding
+ . position X [ PName "Date", PmType Temporal ]
+ . position Y [ PName "Price", PmType Quantitative]
+ in (fromVL . toVegaLite) [ dat [], width 800 , height 500 , mark Line [], enc [] ]
+
+stockSmoothed :: DataSink Stock
+stockSmoothed = dataSink ["Outputs", "StockSmoothed"]
+ (somePureSerial JSONSerial)
+
+stockVegaLite :: DataSink VLSpec
+stockVegaLite = dataSink ["Outputs", "StockSmoothedVegaLite"]
+ (somePureSerial JSONSerial)
+
+-- We do sliding windows for smothing the curve
+
+
+ave :: [Double] -> Double
+ave list = let s = sum list
+ n = fromIntegral (length list)
+ in s/n
+
+msliding :: Int -> [a] -> [[a]]
+msliding n p = case p of
+ [] -> []
+ (_:xs) -> [take n p] ++ (msliding n xs)
+
+-- | The simple computation we want to perform
+computeSmoothedCurve :: Stock -> Stock
+computeSmoothedCurve stock = Stock { chart = [ StockDaily { date = d , close = p} | (d,p) <- datePriceZipped ] } where
+ price = getCloseStock stock
+ priceSmoothed = map ave (msliding 1 price)
+ datePriceZipped = zip (getDateStock stock) priceSmoothed
+
+analyseStock :: (LogThrow m) => PTask m () ()
+analyseStock =
+ loadData stockFile
+ >>> arr computeSmoothedCurve
+ >>> (proc s -> do
+ writeData stockSmoothed -< s
+ writeData stockVegaLite -< stockToVegaLite s)
+ -- >>> arr (\s -> (s , stockToVegaLite s ))
+ -- >>> (writeData stockSmoothed *** writeData stockVegaLite)
+ -- >>> arr (const ())
+
+mainTask :: (LogThrow m) => PTask m () ()
+mainTask =
+ getOption ["Settings"]
+ (docField @"idcompany" ["aapl"::TRIndex] "The NASDAQ of the company to load")
+ >>> parMapTask_ "idcompany" analyseStock
+
+
+-- globalMatrix :: DataSink (Tabular [[Double]])
+-- globalMatrix = dataSink [ "Outputs" , "globalData"]
+-- (somePureSerial (CSVSerial (T.pack "csv") False ','))
+--
+-- putallStocks :: [SlidingWindows] -> Tabular [[Double]]
+-- putallStocks s = Tabular Nothing (map smoothcurve s)
+--
+-- analyseStocks :: (LogThrow m) => PTask m () ()
+-- analyseStocks =
+-- arr (const (S.each ["aapl" , "fb" , "googl"])) >>> loadDataStream "company" stockFile
+-- >>> arr (S.map (\(idx,stock) -> (idx, computeSmoothedCurve stock)))
+-- >>> toPTask (S.toList_)
+-- >>> arr (map snd)
+-- >>> arr putallStocks
+-- >>> writeData globalMatrix
+
+main :: IO ()
+main = runPipelineTask (FullConfig "example-stock" "porcupine-http/examples/example-Stock/example-stock.yaml" "porcupine-http/examples/example-Stock/data" ())
+ (#http <-- useHTTP :& baseContexts "")
+ mainTask ()
diff --git a/porcupine-http.cabal b/porcupine-http.cabal
new file mode 100644
index 0000000..92a3b40
--- /dev/null
+++ b/porcupine-http.cabal
@@ -0,0 +1,118 @@
+cabal-version: 1.12
+
+-- This file has been generated from package.yaml by hpack version 0.32.0.
+--
+-- see: https://github.com/sol/hpack
+--
+-- hash: 800cd688519c710041a03d06e4bfafcf5ab8b4c36615955eafa284caeceb23c9
+
+name: porcupine-http
+version: 0.1.0.0
+synopsis: A location accessor for porcupine to connect to HTTP sources/sinks
+description: Gives a porcupine task pipeline access to HTTP urls (GET, POST or
+ PUT). Implements a specific location type (represented as JSON object) to pack
+ together the url and some common HTTP header fields (method, content-type,
+ etc.). See the README at <https://github.com/tweag/porcupine#README.md> and
+ the examples in the `porcupine-http` package.
+category: Data, Arrows, Combinators, Control, Web
+homepage: https://github.com/tweag/porcupine#readme
+bug-reports: https://github.com/tweag/porcupine/issues
+maintainer: Yves Parès <yves.pares@tweag.io>
+copyright: 2018 EURL Tweag, NovaDiscovery
+license: MIT
+license-file: LICENSE
+build-type: Simple
+
+source-repository head
+ type: git
+ location: https://github.com/tweag/porcupine
+
+library
+ exposed-modules:
+ Data.Locations.Accessors.HTTP
+ other-modules:
+ Paths_porcupine_http
+ hs-source-dirs:
+ src
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , base >=4.10 && <5
+ , bytestring
+ , conduit
+ , containers
+ , http-client
+ , http-conduit
+ , mime-types
+ , porcupine-core ==0.1.*
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , text
+ , transformers
+ default-language: Haskell2010
+
+executable example-pokeapi
+ main-is: ExamplePokeAPI.hs
+ other-modules:
+ Paths_porcupine_http
+ hs-source-dirs:
+ examples/example-Poke
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , base >=4.10 && <5
+ , bytestring
+ , conduit
+ , containers
+ , docrecords
+ , http-client
+ , http-conduit
+ , hvega
+ , mime-types
+ , porcupine-core
+ , porcupine-http
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , text
+ , transformers
+ , unordered-containers
+ default-language: Haskell2010
+
+executable example-stock
+ main-is: ExampleStockAPI.hs
+ other-modules:
+ Paths_porcupine_http
+ hs-source-dirs:
+ examples/example-Stock
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , base >=4.10 && <5
+ , bytestring
+ , conduit
+ , containers
+ , docrecords
+ , http-client
+ , http-conduit
+ , hvega
+ , mime-types
+ , porcupine-core
+ , porcupine-http
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , text
+ , transformers
+ , unordered-containers
+ default-language: Haskell2010
diff --git a/src/Data/Locations/Accessors/HTTP.hs b/src/Data/Locations/Accessors/HTTP.hs
new file mode 100644
index 0000000..1ad2f9b
--- /dev/null
+++ b/src/Data/Locations/Accessors/HTTP.hs
@@ -0,0 +1,159 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE DeriveTraversable #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# OPTIONS_GHC "-fno-warn-orphans" #-}
+{-# OPTIONS_GHC "-fno-warn-name-shadowing" #-}
+
+module Data.Locations.Accessors.HTTP where
+
+import Control.Exception.Safe
+import Control.Monad.ReaderSoup
+import Control.Monad.Trans.Resource
+import Data.Aeson
+import qualified Data.ByteString.Streaming as BSS
+import Data.Function ((&))
+import Data.Locations.Accessors
+import Data.Locations.Loc
+import qualified Data.Map.Strict as Map
+import Data.Maybe
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as TE
+import GHC.Generics (Generic)
+import Network.HTTP.Client (responseTimeoutMicro)
+import Network.HTTP.Client.Internal (Request (..))
+import Network.HTTP.Simple
+import qualified Network.Mime as Mime
+import Streaming
+import qualified Streaming.Conduit as SC
+
+
+-- | The context is just a dummy one for now, but we might want to add for
+-- instance a Manager in the future.
+data HTTPContext = HTTPContext
+
+type instance ContextFromName "http" = HTTPContext
+
+instance SoupContext HTTPContext (ReaderT HTTPContext) where
+ toReaderT = id
+ fromReaderT = id
+
+useHTTP :: ContextRunner (ReaderT HTTPContext) m
+useHTTP = ContextRunner $ flip runReaderT HTTPContext
+
+makeReq :: MonadThrow m => Loc -> m Request
+makeReq loc@RemoteFile{rfProtocol=p}
+ | p == "http" || p == "https" = parseRequest $ show loc
+makeReq loc = error $ show loc ++ " isn't an http(s) URL"
+
+instance (MonadResource m, MonadMask m)
+ => LocationAccessor m "http" where
+ data GLocOf "http" a = HTTPLoc
+ { url :: URL a
+ , writeMethod :: T.Text
+ , readMethod :: T.Text
+ , serial :: Maybe T.Text
+ , acceptContentType :: Maybe T.Text
+ , timeout :: Maybe Int -- In microseconds
+ } deriving (Functor, Foldable, Traversable, Generic, ToJSON)
+ locExists _ = return True
+ writeBSS l bss = do
+ req <- makeReq $ url l
+ (bs :> r) <- BSS.toLazy bss
+ _ <- httpNoBody $
+ req & setRequestMethod (TE.encodeUtf8 $ writeMethod l)
+ & setRequestBodyLBS bs
+ & setRequestCheckStatus
+ & setTimeout l
+ & maybeUpdate
+ (setRequestHeader "Content-type" . (:[]))
+ (TE.encodeUtf8 <$> acceptContentType l)
+ return r
+ readBSS l f = do
+ req <- makeReq $ url l
+ let
+ req' = req
+ & setRequestMethod (TE.encodeUtf8 $ readMethod l)
+ & setTimeout l
+ & setRequestCheckStatus
+ & maybeUpdate
+ (setRequestHeader "Accept" . (:[]))
+ (TE.encodeUtf8 <$> acceptContentType l)
+ f $ SC.toBStream $ httpSource req' getResponseBody
+
+instance (MonadResource m, MonadMask m) => MayProvideLocationAccessors m "http"
+
+-- |
+-- Sets a timeout, obtained from the location
+--
+-- NOTE: We use an internal field of 'Request' to do so. We should probably look
+-- whether it's possible to have a better supported way to set a timeout
+-- (probably setting it in the Manager).
+setTimeout :: GLocOf "http" a -> Request -> Request
+setTimeout = maybeUpdate (\t r -> r{responseTimeout=responseTimeoutMicro t})
+ . timeout
+
+-- |
+-- Extract the mime type out of a file extension
+getMimeType :: T.Text -> Maybe T.Text
+getMimeType ext =
+ TE.decodeUtf8 <$> flip Map.lookup Mime.defaultMimeMap ext
+
+-- |
+-- @maybeUpdate f mY x@ will apply @f Y@ to @x@ if @mY@ is not nothing or @id@.
+--
+-- This is useful for optionally overriding a field in a record
+maybeUpdate :: (b -> a -> a) -> Maybe b -> a -> a
+maybeUpdate f = flip (foldr f)
+
+instance (IsLocString a) => Show (GLocOf "http" a) where
+ show = show . url
+
+getURLType :: URL a -> Maybe T.Text
+getURLType url = case getLocType url of
+ "" -> Nothing
+ ext -> Just $ T.pack ext -- TODO: check that the extension is a valid one
+ -- (from the list in mime-types)
+
+instance (IsLocString a) => FromJSON (GLocOf "http" a) where
+ parseJSON (Object v) = do
+ url <- v .: "url"
+ extension <- (Just <$> v .: "serial") <|> pure (getURLType url)
+ let fallbackMimeType = case extension of
+ Nothing -> pure Nothing
+ Just ext -> case getMimeType ext of
+ Nothing ->
+ fail $ "The extension " <> T.unpack ext <>
+ " has no default mime-type associated to it and you didn't" <>
+ " explicitely supply one via \"acceptContentType\""
+ Just typ -> pure (Just typ)
+ HTTPLoc url <$> (v .: "writeMethod" <|> pure "POST")
+ <*> (v .: "readMethod" <|> pure "GET")
+ <*> pure extension
+ <*> ((Just <$> v .: "acceptContentType") <|> fallbackMimeType)
+ <*> (v .:? "timeout")
+ parseJSON v@(String _) = do
+ url <- parseJSON v
+ case url of
+ RemoteFile{rfProtocol=p}
+ | p == "http" || p == "https" ->
+ let extension = getURLType url in
+ return $ HTTPLoc url "POST" "GET" extension (getMimeType =<< extension) Nothing
+ _ -> fail "Doesn't use http(s) protocol"
+ parseJSON _ = fail
+ "Must be an http(s) URL or a JSON object with fields url,writeMethod,readMethod"
+
+instance TypedLocation (GLocOf "http") where
+ getLocType l = T.unpack . fromMaybe "" $ serial l
+ setLocType l f = l{serial = Just . T.pack . f $ getLocType l}
+ addSubdirToLoc l d = l{url = addSubdirToLoc (url l) d}
+ useLocAsPrefix l p = l{url = useLocAsPrefix (url l) p}