- 完整代码
-
Test Code
for (let i = 0; i < 10; ++i) { const workerTask = new WorkerTask('worker3.js', String(i)) this.threadPool.addWorkerTask(workerTask) .subscribe( data => console.log(data), err => console.log('err:' + err), () => console.log(i + ':finish') ); }
-
Web Worker
import {Observable} from 'rxjs/Observable'; import 'rxjs/add/observable/timer'; interface DedicatedWorkerGlobalScope extends Window { postMessage(data: string): void; close(): void; } onmessage = function (e) { Observable.timer(1000).subscribe(data => { const self = this as DedicatedWorkerGlobalScope; for (let i = 0; i < 10; ++i) { self.postMessage(e.data + ':' + i); } self.postMessage('done'); self.close(); }); };
-
ThreadPoolService
import {Injectable} from '@angular/core'; import {Observable} from 'rxjs/Observable'; export class WorkerTask { public observer; constructor(public script: string, public startMessage: string) { } } class WorkerThread { private workerTask: WorkerTask; constructor(private parentPool: ThreadPoolService) { } public run(workerTask: WorkerTask) { this.workerTask = workerTask; // create a new web worker if (this.workerTask.script != null) { const worker = new Worker(workerTask.script); worker.addEventListener('message', event => this.dummyCallback(event), false); worker.postMessage(workerTask.startMessage); } } // for now assume we only get a single callback from a worker // which also indicates the end of this worker. public dummyCallback(event) { let done = false; // pass to original callback if ('data' in event) { this.workerTask.observer.next(event.data); if (event.data === 'done') { done = true; this.workerTask.observer.complete(); } } else { this.workerTask.observer.error(event); done = true; } // we should use a seperate thread to add the worker if (done) { this.parentPool.freeWorkerThread(this); } } } @Injectable() export class ThreadPoolService { private taskQueue: WorkerTask[] = []; private threadQueue: WorkerThread[] = []; private poolSize = 5; constructor() { for (let i = 0; i < this.poolSize; i++) { this.threadQueue.push(new WorkerThread(this)); } } public setSize(size: number) { this.poolSize = size; this.threadQueue = []; for (let i = 0; i < size; i++) { this.threadQueue.push(new WorkerThread(this)); } } public addWorkerTask(workerTask: WorkerTask) { return Observable.create(observer => { workerTask.observer = observer; if (this.threadQueue.length > 0) { // get the worker from the front of the queue const workerThread = this.threadQueue.shift(); workerThread.run(workerTask); } else { // no free workers, this.taskQueue.push(workerTask); } }); } public freeWorkerThread(workerThread: WorkerThread) { if (this.taskQueue.length > 0) { // don't put back in queue, but execute next task const workerTask = this.taskQueue.shift(); workerThread.run(workerTask); } else { this.threadQueue.push(workerThread); } } }
-
- 难点分析
worker.addEventListener('message', event => this.dummyCallback(event), false);
第二个参数,如果写成 this.dummyCallback
,则回调时,this 环境已经变化,代码运行异常。 event => this.dummyCallback(evnet)
是 es5 的语法,用来绑定 this,es6 的语法可以写成 this.dummyCallback.bind(tihs)