Magellan Linux

Annotation of /alx-src/tags/kernel26-2.6.12-alx-r9/kernel/workqueue.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 628 - (hide annotations) (download)
Wed Mar 4 10:48:58 2009 UTC (15 years, 3 months ago) by niro
Original Path: alx-src/trunk/kernel26-alx/linux/kernel/workqueue.c
File MIME type: text/plain
File size: 13801 byte(s)
import linux sources based on 2.6.12-alx-r9:
 -using linux-2.6.12.6
 -using 2.6.12-ck6 patch set
 -using fbsplash-0.9.2-r3
 -using vesafb-tng-0.9-rc7
 -using squashfs-2.2
 -added cddvd-cmdfilter-drop.patch as ck dropped it
 -added via-epia-dri (cle266) patch
 -added zd1211-svn-32 wlan driver (http://zd1211.ath.cx/download/)
 -added debian patches to zd1211 for wep256 etc

1 niro 628 /*
2     * linux/kernel/workqueue.c
3     *
4     * Generic mechanism for defining kernel helper threads for running
5     * arbitrary tasks in process context.
6     *
7     * Started by Ingo Molnar, Copyright (C) 2002
8     *
9     * Derived from the taskqueue/keventd code by:
10     *
11     * David Woodhouse <dwmw2@infradead.org>
12     * Andrew Morton <andrewm@uow.edu.au>
13     * Kai Petzke <wpp@marie.physik.tu-berlin.de>
14     * Theodore Ts'o <tytso@mit.edu>
15     */
16    
17     #include <linux/module.h>
18     #include <linux/kernel.h>
19     #include <linux/sched.h>
20     #include <linux/init.h>
21     #include <linux/signal.h>
22     #include <linux/completion.h>
23     #include <linux/workqueue.h>
24     #include <linux/slab.h>
25     #include <linux/cpu.h>
26     #include <linux/notifier.h>
27     #include <linux/kthread.h>
28    
29     /*
30     * The per-CPU workqueue (if single thread, we always use cpu 0's).
31     *
32     * The sequence counters are for flush_scheduled_work(). It wants to wait
33     * until until all currently-scheduled works are completed, but it doesn't
34     * want to be livelocked by new, incoming ones. So it waits until
35     * remove_sequence is >= the insert_sequence which pertained when
36     * flush_scheduled_work() was called.
37     */
38     struct cpu_workqueue_struct {
39    
40     spinlock_t lock;
41    
42     long remove_sequence; /* Least-recently added (next to run) */
43     long insert_sequence; /* Next to add */
44    
45     struct list_head worklist;
46     wait_queue_head_t more_work;
47     wait_queue_head_t work_done;
48    
49     struct workqueue_struct *wq;
50     task_t *thread;
51    
52     int run_depth; /* Detect run_workqueue() recursion depth */
53     } ____cacheline_aligned;
54    
55     /*
56     * The externally visible workqueue abstraction is an array of
57     * per-CPU workqueues:
58     */
59     struct workqueue_struct {
60     struct cpu_workqueue_struct cpu_wq[NR_CPUS];
61     const char *name;
62     struct list_head list; /* Empty if single thread */
63     };
64    
65     /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
66     threads to each one as cpus come/go. */
67     static DEFINE_SPINLOCK(workqueue_lock);
68     static LIST_HEAD(workqueues);
69    
70     /* If it's single threaded, it isn't in the list of workqueues. */
71     static inline int is_single_threaded(struct workqueue_struct *wq)
72     {
73     return list_empty(&wq->list);
74     }
75    
76     /* Preempt must be disabled. */
77     static void __queue_work(struct cpu_workqueue_struct *cwq,
78     struct work_struct *work)
79     {
80     unsigned long flags;
81    
82     spin_lock_irqsave(&cwq->lock, flags);
83     work->wq_data = cwq;
84     list_add_tail(&work->entry, &cwq->worklist);
85     cwq->insert_sequence++;
86     wake_up(&cwq->more_work);
87     spin_unlock_irqrestore(&cwq->lock, flags);
88     }
89    
90     /*
91     * Queue work on a workqueue. Return non-zero if it was successfully
92     * added.
93     *
94     * We queue the work to the CPU it was submitted, but there is no
95     * guarantee that it will be processed by that CPU.
96     */
97     int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
98     {
99     int ret = 0, cpu = get_cpu();
100    
101     if (!test_and_set_bit(0, &work->pending)) {
102     if (unlikely(is_single_threaded(wq)))
103     cpu = 0;
104     BUG_ON(!list_empty(&work->entry));
105     __queue_work(wq->cpu_wq + cpu, work);
106     ret = 1;
107     }
108     put_cpu();
109     return ret;
110     }
111    
112     static void delayed_work_timer_fn(unsigned long __data)
113     {
114     struct work_struct *work = (struct work_struct *)__data;
115     struct workqueue_struct *wq = work->wq_data;
116     int cpu = smp_processor_id();
117    
118     if (unlikely(is_single_threaded(wq)))
119     cpu = 0;
120    
121     __queue_work(wq->cpu_wq + cpu, work);
122     }
123    
124     int fastcall queue_delayed_work(struct workqueue_struct *wq,
125     struct work_struct *work, unsigned long delay)
126     {
127     int ret = 0;
128     struct timer_list *timer = &work->timer;
129    
130     if (!test_and_set_bit(0, &work->pending)) {
131     BUG_ON(timer_pending(timer));
132     BUG_ON(!list_empty(&work->entry));
133    
134     /* This stores wq for the moment, for the timer_fn */
135     work->wq_data = wq;
136     timer->expires = jiffies + delay;
137     timer->data = (unsigned long)work;
138     timer->function = delayed_work_timer_fn;
139     add_timer(timer);
140     ret = 1;
141     }
142     return ret;
143     }
144    
145     static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
146     {
147     unsigned long flags;
148    
149     /*
150     * Keep taking off work from the queue until
151     * done.
152     */
153     spin_lock_irqsave(&cwq->lock, flags);
154     cwq->run_depth++;
155     if (cwq->run_depth > 3) {
156     /* morton gets to eat his hat */
157     printk("%s: recursion depth exceeded: %d\n",
158     __FUNCTION__, cwq->run_depth);
159     dump_stack();
160     }
161     while (!list_empty(&cwq->worklist)) {
162     struct work_struct *work = list_entry(cwq->worklist.next,
163     struct work_struct, entry);
164     void (*f) (void *) = work->func;
165     void *data = work->data;
166    
167     list_del_init(cwq->worklist.next);
168     spin_unlock_irqrestore(&cwq->lock, flags);
169    
170     BUG_ON(work->wq_data != cwq);
171     clear_bit(0, &work->pending);
172     f(data);
173    
174     spin_lock_irqsave(&cwq->lock, flags);
175     cwq->remove_sequence++;
176     wake_up(&cwq->work_done);
177     }
178     cwq->run_depth--;
179     spin_unlock_irqrestore(&cwq->lock, flags);
180     }
181    
182     static int worker_thread(void *__cwq)
183     {
184     struct cpu_workqueue_struct *cwq = __cwq;
185     DECLARE_WAITQUEUE(wait, current);
186     struct k_sigaction sa;
187     sigset_t blocked;
188    
189     current->flags |= PF_NOFREEZE;
190    
191     set_user_nice(current, -5);
192    
193     /* Block and flush all signals */
194     sigfillset(&blocked);
195     sigprocmask(SIG_BLOCK, &blocked, NULL);
196     flush_signals(current);
197    
198     /* SIG_IGN makes children autoreap: see do_notify_parent(). */
199     sa.sa.sa_handler = SIG_IGN;
200     sa.sa.sa_flags = 0;
201     siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
202     do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
203    
204     set_current_state(TASK_INTERRUPTIBLE);
205     while (!kthread_should_stop()) {
206     add_wait_queue(&cwq->more_work, &wait);
207     if (list_empty(&cwq->worklist))
208     schedule();
209     else
210     __set_current_state(TASK_RUNNING);
211     remove_wait_queue(&cwq->more_work, &wait);
212    
213     if (!list_empty(&cwq->worklist))
214     run_workqueue(cwq);
215     set_current_state(TASK_INTERRUPTIBLE);
216     }
217     __set_current_state(TASK_RUNNING);
218     return 0;
219     }
220    
221     static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
222     {
223     if (cwq->thread == current) {
224     /*
225     * Probably keventd trying to flush its own queue. So simply run
226     * it by hand rather than deadlocking.
227     */
228     run_workqueue(cwq);
229     } else {
230     DEFINE_WAIT(wait);
231     long sequence_needed;
232    
233     spin_lock_irq(&cwq->lock);
234     sequence_needed = cwq->insert_sequence;
235    
236     while (sequence_needed - cwq->remove_sequence > 0) {
237     prepare_to_wait(&cwq->work_done, &wait,
238     TASK_UNINTERRUPTIBLE);
239     spin_unlock_irq(&cwq->lock);
240     schedule();
241     spin_lock_irq(&cwq->lock);
242     }
243     finish_wait(&cwq->work_done, &wait);
244     spin_unlock_irq(&cwq->lock);
245     }
246     }
247    
248     /*
249     * flush_workqueue - ensure that any scheduled work has run to completion.
250     *
251     * Forces execution of the workqueue and blocks until its completion.
252     * This is typically used in driver shutdown handlers.
253     *
254     * This function will sample each workqueue's current insert_sequence number and
255     * will sleep until the head sequence is greater than or equal to that. This
256     * means that we sleep until all works which were queued on entry have been
257     * handled, but we are not livelocked by new incoming ones.
258     *
259     * This function used to run the workqueues itself. Now we just wait for the
260     * helper threads to do it.
261     */
262     void fastcall flush_workqueue(struct workqueue_struct *wq)
263     {
264     might_sleep();
265    
266     if (is_single_threaded(wq)) {
267     /* Always use cpu 0's area. */
268     flush_cpu_workqueue(wq->cpu_wq + 0);
269     } else {
270     int cpu;
271    
272     lock_cpu_hotplug();
273     for_each_online_cpu(cpu)
274     flush_cpu_workqueue(wq->cpu_wq + cpu);
275     unlock_cpu_hotplug();
276     }
277     }
278    
279     static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
280     int cpu)
281     {
282     struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
283     struct task_struct *p;
284    
285     spin_lock_init(&cwq->lock);
286     cwq->wq = wq;
287     cwq->thread = NULL;
288     cwq->insert_sequence = 0;
289     cwq->remove_sequence = 0;
290     INIT_LIST_HEAD(&cwq->worklist);
291     init_waitqueue_head(&cwq->more_work);
292     init_waitqueue_head(&cwq->work_done);
293    
294     if (is_single_threaded(wq))
295     p = kthread_create(worker_thread, cwq, "%s", wq->name);
296     else
297     p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
298     if (IS_ERR(p))
299     return NULL;
300     cwq->thread = p;
301     return p;
302     }
303    
304     struct workqueue_struct *__create_workqueue(const char *name,
305     int singlethread)
306     {
307     int cpu, destroy = 0;
308     struct workqueue_struct *wq;
309     struct task_struct *p;
310    
311     BUG_ON(strlen(name) > 10);
312    
313     wq = kmalloc(sizeof(*wq), GFP_KERNEL);
314     if (!wq)
315     return NULL;
316     memset(wq, 0, sizeof(*wq));
317    
318     wq->name = name;
319     /* We don't need the distraction of CPUs appearing and vanishing. */
320     lock_cpu_hotplug();
321     if (singlethread) {
322     INIT_LIST_HEAD(&wq->list);
323     p = create_workqueue_thread(wq, 0);
324     if (!p)
325     destroy = 1;
326     else
327     wake_up_process(p);
328     } else {
329     spin_lock(&workqueue_lock);
330     list_add(&wq->list, &workqueues);
331     spin_unlock(&workqueue_lock);
332     for_each_online_cpu(cpu) {
333     p = create_workqueue_thread(wq, cpu);
334     if (p) {
335     kthread_bind(p, cpu);
336     wake_up_process(p);
337     } else
338     destroy = 1;
339     }
340     }
341     unlock_cpu_hotplug();
342    
343     /*
344     * Was there any error during startup? If yes then clean up:
345     */
346     if (destroy) {
347     destroy_workqueue(wq);
348     wq = NULL;
349     }
350     return wq;
351     }
352    
353     static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
354     {
355     struct cpu_workqueue_struct *cwq;
356     unsigned long flags;
357     struct task_struct *p;
358    
359     cwq = wq->cpu_wq + cpu;
360     spin_lock_irqsave(&cwq->lock, flags);
361     p = cwq->thread;
362     cwq->thread = NULL;
363     spin_unlock_irqrestore(&cwq->lock, flags);
364     if (p)
365     kthread_stop(p);
366     }
367    
368     void destroy_workqueue(struct workqueue_struct *wq)
369     {
370     int cpu;
371    
372     flush_workqueue(wq);
373    
374     /* We don't need the distraction of CPUs appearing and vanishing. */
375     lock_cpu_hotplug();
376     if (is_single_threaded(wq))
377     cleanup_workqueue_thread(wq, 0);
378     else {
379     for_each_online_cpu(cpu)
380     cleanup_workqueue_thread(wq, cpu);
381     spin_lock(&workqueue_lock);
382     list_del(&wq->list);
383     spin_unlock(&workqueue_lock);
384     }
385     unlock_cpu_hotplug();
386     kfree(wq);
387     }
388    
389     static struct workqueue_struct *keventd_wq;
390    
391     int fastcall schedule_work(struct work_struct *work)
392     {
393     return queue_work(keventd_wq, work);
394     }
395    
396     int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
397     {
398     return queue_delayed_work(keventd_wq, work, delay);
399     }
400    
401     int schedule_delayed_work_on(int cpu,
402     struct work_struct *work, unsigned long delay)
403     {
404     int ret = 0;
405     struct timer_list *timer = &work->timer;
406    
407     if (!test_and_set_bit(0, &work->pending)) {
408     BUG_ON(timer_pending(timer));
409     BUG_ON(!list_empty(&work->entry));
410     /* This stores keventd_wq for the moment, for the timer_fn */
411     work->wq_data = keventd_wq;
412     timer->expires = jiffies + delay;
413     timer->data = (unsigned long)work;
414     timer->function = delayed_work_timer_fn;
415     add_timer_on(timer, cpu);
416     ret = 1;
417     }
418     return ret;
419     }
420    
421     void flush_scheduled_work(void)
422     {
423     flush_workqueue(keventd_wq);
424     }
425    
426     /**
427     * cancel_rearming_delayed_workqueue - reliably kill off a delayed
428     * work whose handler rearms the delayed work.
429     * @wq: the controlling workqueue structure
430     * @work: the delayed work struct
431     */
432     void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
433     struct work_struct *work)
434     {
435     while (!cancel_delayed_work(work))
436     flush_workqueue(wq);
437     }
438     EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
439    
440     /**
441     * cancel_rearming_delayed_work - reliably kill off a delayed keventd
442     * work whose handler rearms the delayed work.
443     * @work: the delayed work struct
444     */
445     void cancel_rearming_delayed_work(struct work_struct *work)
446     {
447     cancel_rearming_delayed_workqueue(keventd_wq, work);
448     }
449     EXPORT_SYMBOL(cancel_rearming_delayed_work);
450    
451     int keventd_up(void)
452     {
453     return keventd_wq != NULL;
454     }
455    
456     int current_is_keventd(void)
457     {
458     struct cpu_workqueue_struct *cwq;
459     int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
460     int ret = 0;
461    
462     BUG_ON(!keventd_wq);
463    
464     cwq = keventd_wq->cpu_wq + cpu;
465     if (current == cwq->thread)
466     ret = 1;
467    
468     return ret;
469    
470     }
471    
472     #ifdef CONFIG_HOTPLUG_CPU
473     /* Take the work from this (downed) CPU. */
474     static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
475     {
476     struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
477     LIST_HEAD(list);
478     struct work_struct *work;
479    
480     spin_lock_irq(&cwq->lock);
481     list_splice_init(&cwq->worklist, &list);
482    
483     while (!list_empty(&list)) {
484     printk("Taking work for %s\n", wq->name);
485     work = list_entry(list.next,struct work_struct,entry);
486     list_del(&work->entry);
487     __queue_work(wq->cpu_wq + smp_processor_id(), work);
488     }
489     spin_unlock_irq(&cwq->lock);
490     }
491    
492     /* We're holding the cpucontrol mutex here */
493     static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
494     unsigned long action,
495     void *hcpu)
496     {
497     unsigned int hotcpu = (unsigned long)hcpu;
498     struct workqueue_struct *wq;
499    
500     switch (action) {
501     case CPU_UP_PREPARE:
502     /* Create a new workqueue thread for it. */
503     list_for_each_entry(wq, &workqueues, list) {
504     if (create_workqueue_thread(wq, hotcpu) < 0) {
505     printk("workqueue for %i failed\n", hotcpu);
506     return NOTIFY_BAD;
507     }
508     }
509     break;
510    
511     case CPU_ONLINE:
512     /* Kick off worker threads. */
513     list_for_each_entry(wq, &workqueues, list) {
514     kthread_bind(wq->cpu_wq[hotcpu].thread, hotcpu);
515     wake_up_process(wq->cpu_wq[hotcpu].thread);
516     }
517     break;
518    
519     case CPU_UP_CANCELED:
520     list_for_each_entry(wq, &workqueues, list) {
521     /* Unbind so it can run. */
522     kthread_bind(wq->cpu_wq[hotcpu].thread,
523     smp_processor_id());
524     cleanup_workqueue_thread(wq, hotcpu);
525     }
526     break;
527    
528     case CPU_DEAD:
529     list_for_each_entry(wq, &workqueues, list)
530     cleanup_workqueue_thread(wq, hotcpu);
531     list_for_each_entry(wq, &workqueues, list)
532     take_over_work(wq, hotcpu);
533     break;
534     }
535    
536     return NOTIFY_OK;
537     }
538     #endif
539    
540     void init_workqueues(void)
541     {
542     hotcpu_notifier(workqueue_cpu_callback, 0);
543     keventd_wq = create_workqueue("events");
544     BUG_ON(!keventd_wq);
545     }
546    
547     EXPORT_SYMBOL_GPL(__create_workqueue);
548     EXPORT_SYMBOL_GPL(queue_work);
549     EXPORT_SYMBOL_GPL(queue_delayed_work);
550     EXPORT_SYMBOL_GPL(flush_workqueue);
551     EXPORT_SYMBOL_GPL(destroy_workqueue);
552    
553     EXPORT_SYMBOL(schedule_work);
554     EXPORT_SYMBOL(schedule_delayed_work);
555     EXPORT_SYMBOL(schedule_delayed_work_on);
556     EXPORT_SYMBOL(flush_scheduled_work);