Benutzer:MovGP0/F♯/Async

aus Wikipedia, der freien Enzyklopädie
   MovGP0        Über mich        Hilfen        Artikel        Weblinks        Literatur        Zitate        Notizen        Programmierung        MSCert        Physik      


Events

open System
let form = new Form(Text = "F# Events", Visible = true, TopMost = true)

// register Event Handler
form.Click.Add(fun args -> printfn "Clicked")

// filter events
form.MouseDown
|> Event.filter (fun args -> args.X < 50)
|> Event.map (fun args -> printfn "%d %d" args.X args.Y)

Application.Run(form)

Background Workers

open System.Threading
open System.ComponentModel

// define worker that can be cancelled
let worker = new BackgroundWorker(WorkerSupportsCancellation = true)

// define process that can be cancelled
worker.DoWork(fun args -> 
   for i in 1..50 do
      if (worker.CancellationPending = false)
      then
         Thread.Sleep 1000
         printf "%A" i
)

// define process that continues when cancelled
worker.DoWork.Add(fun args -> 
   for i in 1..10 do
      Thread.Sleep 500
      printfn "B: %A" i
)

worker.RunWorkerCompleted.Add(fun args -> printfn "Completed.")

// start worker
worker.RunWorkerAsync()

// cancel worker
Thread.Sleep 1500
worker.CancelAsync()

Threads

open System.Threading

let thread = new Thread(fun () -> 
   for i in 1..10 do
      try
         Thread.Sleep 1000
      with
         | :? ThreadAbortException as ex -> prinfn "Exception %A" ex
      printfn "I'm still running"
)
thread.Start ()


open System.Threading

let runMe () = 
   for i in 1..10 do
      try
         Thread.Sleep 1000
      with
         | :? ThreadAbortException as ex -> prinfn "Exception %A" ex
      printfn "I'm still running"

let createThread () = 
   let thread = new Thread(runMe)
   thread.Start ()

createThread()
createThread()

Thread Pools

open System.Threading

let runMe (arg:object) = 
   for i in 1..10 do
      try
         Thread.Sleep 1000
      with
         | :? ThreadAboardException as ex -> printfn "Exception %A" ex
      printfn "%A still running"

TreadPool.QueueUserWorkItem(new WaitCallback(runMe), "one")
TreadPool.QueueUserWorkItem(new WaitCallback(runMe), "two")
TreadPool.QueueUserWorkItem(new WaitCallback(runMe), "three")

Asynchronous Computing

  • based on the Asynchronous Programming Model (APM)[1]
Asynchronous Workflows
// invoke Async.Start(fun () -> expression)
async {
   expression
}
Asynchronous Binding
  • uses the let! keyword
// invoke Async.Start(fun () -> expression)
let! response = request.GetResponseAsync()
Example
open System
open System.Net
open Microsoft.FSharp.Control.WebExtensions

///<summary>Stock symbol and URL to Yahoo finance</summary>
let urlList = [
      "MSFT", "http://ichart.finance.yahoo.com/table.csv?s=MSFT&d=6&e=6&f=2013&g=d&a=1&b=1&c=2010&ignore=.csv"
      "GOOG", "http://ichart.finance.yahoo.com/table.csv?s=GOOG&d=6&e=6&f=2013&g=d&a=1&b=1&c=2010&ignore=.csv"
      "EBAY", "http://ichart.finance.yahoo.com/table.csv?s=EBAY&d=6&e=6&f=2013&g=d&a=1&b=1&c=2010&ignore=.csv"
      "AAPL", "http://ichart.finance.yahoo.com/table.csv?s=AAPL&d=6&e=6&f=2013&g=d&a=1&b=1&c=2010&ignore=.csv"
      "ADBE", "http://ichart.finance.yahoo.com/table.csv?s=ADBE&d=6&e=6&f=2013&g=d&a=1&b=1&c=2010&ignore=.csv"
   ]

///<summary>Parse CSV and extract max price</summary>
let getMaxPrice (data:string) = 
   data.Split('\n')
   |> Seq.skip 1 
   |> Seq.take (rows.Length - 2)
   |> Seq.map (fun s -> s.Split(','))
   |> Seq.map (fun s -> float s.[4]) // get price
   |> Seq.max

///<summary>Async fetch of csv data</summary>
let fetchAsync (name, url:string) = 
   async {
      try
         let uri = new Uri(url)
         let webClient = new WebClient()
         let! csv = webClient.AsyncDownloadString uri
         let maxPrice = getMaxPrice(csv.ToString())
         printfn "Downloaded historical data for %s. Received %d characters. Max closing price since 2010-01-01 is %f" name csv.Length maxprice
      with
         | ex -> printfn "Exception: %s" ex.Message
   }

let runAll () = 
   urlList
   |> Seq.map fetchAsync
   |> Async.Parallel
   |> Async.RunSynchronously // wait till tasks are finished
   |> ignore

runAll ()

Parallel Programming using TPL

  • used for CPU heavy tasks with few to none I/O operations
Parallel.For
Parallel.ForEach

Parallel.Async

  • for async Actions which return () (Unit, void)
Parallel.Async [ asyncAction1, asyncAction2, ... ]

Mailbox Processor (Agent)

  • Agents are state machines that change state based on input messages
open System

type Agent<'T> = MailboxProcessor<'T>

type CounterMessage = 
   | Update of float
   | Reset

module Helpers = 
   let genRandomNumber n = 
      let rnd = new Random()
      float rnd.Next (n, 100)

module MaxAgent = 
   let sampleAgent = Agent.Start(fun inbox -> 
      let rec loop max = async {
         let! msg = inbox.Receive()
         match msg with
         | Reset -> return! loop 0.0
         | Update value -> 
            let max = Math.Max(max, value)
            printfn "Max: %f" max
            do! Async.Sleep 1000
            return! loop max
      }
      loop 0.0
   )

let agent = MaxAgent.sampleAgent
let random = Helpers.genRandomNumber 5
agent.Post(Update random)

References

  1. Asynchronous Programming Model (APM). In: MSDN. Microsoft, abgerufen am 30. Juni 2014 (englisch).