Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Parallelize CSR (sparse matrix) methods #19717

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

alugowski
Copy link
Contributor

@alugowski alugowski commented Dec 20, 2023

(Closes gh-20845)

Apologies for the wall of text, I want to provide context and reasoning for the changes made here.

Most CSR sparse matrix operations have an outer loop that iterates over the matrix rows. Often each iteration of this loop is, or can easily be made to be, independent. This PR parallelizes this loop.

PR is fully implemented and working, but has some product-level questions remaining. See the tradeoffs section below. I'll update the PR to remove code for the rejected options.

Choice of parallelism framework

OpenMP is explicitly off the table as per docs, so no omp parallel for. Common libraries like TBB or Taskflow seem too heavy for just this one application, correct me if I'm wrong. That leaves simple threads or a thread pool.

I chose to use the interface of the C++17 parallel algorithms to make the code future-proof and easy to understand for C++ programmers. I say interface because compiler support for those algorithms is not all there at the moment. Instead, I implemented the required algorithms on top of a thread pool using C++11 threads. This is fairly small and works on all platforms. It seems useful enough to be its own product, so I released it as poolSTL.

Parallel matmul

Only method that requires signature changes.

The core SMMP algorithm has two embarrassingly parallel phases. Phase 1 calculates the size of the result matrix then phase 2 fills it in.

The signature change is to the phase 1 implementation, csr_matmat_maxnnz(). Previously this method only calculated the result's nnz. A parallel phase 2 requires a pre-computed indptr (Cp), so the change is to have csr_matmat_maxnnz() return the row sizes as it computes the nnz. A cumsum of the row sizes is Cp. This is done in Python to handle the (rare) case where the output matrix has enough non-zeros to overflow the input matrices' index type.

A small logic change is also required. The previous code would only count computed values, regardless whether those values are zero or not. Over-allocation is ok. The parallel code cannot do that as phase 2 is not allowed to emit explicit zeros. Therefore, phase 1 must also compute the values to test against zero to return an accurate nnz.

Unfortunately this is extra work. Quick benchmarks suggest the single-threaded code is about 20-30% slower, but this is easily made up for in parallel.

Parallel binop

I only parallelized the canonical version.

Binop faces the same problem as matmul: the size and sparsity pattern of the output is needed for parallelism, but it's unknown. The sequential code would allocate for the worst case, then just construct the result. The parallel code uses matmul's approach: Phase 1 computes Cp in parallel, then phase 2 fills in the column indices and data in parallel. The sequential case remains the same single pass as before.

One weird observation I can't fully explain is that the sequential case now appears to be significantly faster than before, like often 2x. The only change I can identify is that the new code breaks some accidental inter-loop data dependencies, but that doesn't seem like it should be enough for the performance boost. Happy to be corrected.

Parallel Other methods

Most obvious beneficiary is csr_sort_indices. Used everywhere and parallelizes very well. I also added an optimization to remove a temporary copy for the sort.

Other embarrassingly parallel methods just replace their outer for loops with parallel std::for_each. Some methods that check the format instead use std::all_of.

Parallelism control

Note: methods stay sequential unless the matrix is fairly large, current definition of "large" is 64k non-zeros. This may seem overly large, but sequential ops are fast. pnumpy also uses this threshold.

Parallelism is controlled with threadpoolctl:

with threadpoolctl.threadpool_limits(limits=4):
    res = matrix1 @ matrix2

PR default is single threaded. One must use the code above to enable parallelism.

Benchmarks

The standard scipy benchmarks are too small to really exercise this PR. On my machine few take longer than a millisecond. I have a branch with much bigger versions of the standard benchmarks: 6b78eac

If folks could try on their own benchmarks or their own workflows that would be very helpful.

Tests

I configured the tests to run twice: once sequentially and once with parallelism forced on even for tiny problems. Happy to change this if it's excessive.

Product level decisions and tradeoffs

Is the worker count API ok?

Is threadpoolctl enough?

Other parallel SciPy C++ codes (fft, optimize) use explicit worker arguments. That would be difficult here as none of these methods are called directly by the user, and many are operators.

Any other (or additional) options?

Thread pool vs Dedicated threads

A common thread pool started on first use of a large op (Pros: threads are reused) vs starting and joining dedicated threads for each op (Pros: smaller library size, no idle worker threads left behind).

Controlled by USE_POOL constant in scipy/sparse/sparsetools/util_par.h.

What ops to parallelize

Library size is the main downside of this PR. Note that all the methods in csr.h are templates with quite a few instantiations each. The dedicated threads create about one more symbol for each method, while the thread pool requires several more (due to wrappers, vtables, etc). I've already more than halved the .so size from my first attempts, would appreciate further ideas from folks with stronger nm skills than mine. Also, I understand that the final release build does some symbol stripping so the following may be an exaggeration.

To further control library size this PR includes two more switches in util_par.h:

  • PARALLEL_BINOP whether to parallelize binops.
  • PARALLEL_OTHERS whether to parallelize methods other than matmul, sort_elements, binop. Currently off.

It should also be possible to parallelize just some binops (like plus, minus) and save library size on the others.

For context, rough sizes of _sparsetools.cpython-310-darwin.so as built by python dev.py build on my machine:

  • main branch: 4.0 MB
  • USE_POOL=0/PARALLEL_BINOP=0 : 4.8 MB
  • USE_POOL=0/PARALLEL_BINOP=1 : 8.0 MB
  • USE_POOL=1/PARALLEL_BINOP=0 : 5.4 MB
  • USE_POOL=1/PARALLEL_BINOP=1 : 13 MB

If the choice is to go dedicated threads (no pool) then we can rip just those methods from poolSTL and drop that file entirely. The pure threads functions are <100 lines total.

@alugowski
Copy link
Contributor Author

Benchmarks

I'm attaching the ASV benchmark compare of this PR against the main branch using the bigger benchmarks I mentioned ( 6b78eac )

Running on M1 Macbook pro (6 + 2 cores)

All are single repetition because I couldn't get ASV to do repeats, hence noisy (ignore the noisy ~1ms runs). But the pattern is clear, the bigger benchmarks are significantly faster.

Random matrix multiply:

| Change   | Before [6b78eacb] <bigbench>   | After [a9acb88f] <parallel_csr>   |   Ratio | Benchmark (Parameter)                             |
|----------|--------------------------------|-----------------------------------|---------|---------------------------------------------------|
| +        | 1.03±0ms                       | 2.62±0ms                          |    2.55 | sparse.MatmulLargeCSRRand.time_large('10K,1M')    |
| +        | 132±0μs                        | 174±0μs                           |    1.33 | sparse.MatmulLargeCSRRand.time_large('1K,1K')     |
| -        | 1.90±0ms                       | 1.31±0ms                          |    0.69 | sparse.MatmulLargeCSRRand.time_large('100K,100K') |
| -        | 4.29±0ms                       | 1.92±0ms                          |    0.45 | sparse.MatmulLargeCSRRand.time_large('1M,10K')    |
| -        | 57.2±0ms                       | 25.5±0ms                          |    0.45 | sparse.MatmulLargeCSRRand.time_large('1M,1M')     |
| -        | 310±0ms                        | 87.7±0ms                          |    0.28 | sparse.MatmulLargeCSRRand.time_large('10M,1M')    |
| -        | 3.28±0s                        | 824±0ms                           |    0.25 | sparse.MatmulLargeCSRRand.time_large('10M,10M')   |
| -        | 6.23±0s                        | 1.50±0s                           |    0.24 | sparse.MatmulLargeCSRRand.time_large('15M,15M')   |

Sort elements:

| Change   | Before [6b78eacb] <bigbench>   | After [a9acb88f] <parallel_csr>   |   Ratio | Benchmark (Parameter)             |
|----------|--------------------------------|-----------------------------------|---------|-----------------------------------|
| +        | 799±0μs                        | 953±0μs                           |    1.19 | sparse.Sort.time_sort('Rand10')   |
| -        | 767±0μs                        | 283±0μs                           |    0.37 | sparse.Sort.time_sort('Rand25')   |
| -        | 1.45±0ms                       | 404±0μs                           |    0.28 | sparse.Sort.time_sort('Rand50')   |
| -        | 3.14±0ms                       | 805±0μs                           |    0.26 | sparse.Sort.time_sort('Rand100')  |
| -        | 50.0±0ms                       | 12.4±0ms                          |    0.25 | sparse.Sort.time_sort('Rand2000') |
| -        | 5.67±0ms                       | 1.29±0ms                          |    0.23 | sparse.Sort.time_sort('Rand200')  |
| -        | 26.4±0ms                       | 4.94±0ms                          |    0.19 | sparse.Sort.time_sort('Rand1000') |

Large arithmetic ops:

| Change   | Before [6b78eacb] <bigbench>   | After [a9acb88f] <parallel_csr>   |   Ratio | Benchmark (Parameter)                                                   |
|----------|--------------------------------|-----------------------------------|---------|-------------------------------------------------------------------------|

*truncated for size, see Arithmetic.txt below*

| -        | 176±0ms                        | 91.6±0ms                          |    0.52 | sparse.Arithmetic.time_arithmetic('csr', 'AB', 1000, True, '__mul__')   |
| -        | 3.46±0ms                       | 1.81±0ms                          |    0.52 | sparse.Arithmetic.time_arithmetic('csr', 'BB', 250, False, '__sub__')   |
| -        | 11.0±0ms                       | 5.47±0ms                          |    0.5  | sparse.Arithmetic.time_arithmetic('csr', 'BA', 250, False, '__mul__')   |
| -        | 5.67±0ms                       | 2.73±0ms                          |    0.48 | sparse.Arithmetic.time_arithmetic('csr', 'AA', 250, True, '__mul__')    |
| -        | 55.2±0ms                       | 26.4±0ms                          |    0.48 | sparse.Arithmetic.time_arithmetic('csr', 'BB', 1000, False, '__sub__')  |
| -        | 3.53±0ms                       | 1.65±0ms                          |    0.47 | sparse.Arithmetic.time_arithmetic('dia', 'BB', 250, False, '__sub__')   |
| -        | 3.62±0ms                       | 1.67±0ms                          |    0.46 | sparse.Arithmetic.time_arithmetic('coo', 'BB', 250, False, '__sub__')   |
| -        | 91.8±0ms                       | 39.4±0ms                          |    0.43 | sparse.Arithmetic.time_arithmetic('csr', 'AA', 1000, False, '__mul__')  |
| -        | 23.6±0ms                       | 10.0±0ms                          |    0.43 | sparse.Arithmetic.time_arithmetic('dia', 'BB', 250, False, '__mul__')   |
| -        | 92.0±0ms                       | 37.7±0ms                          |    0.41 | sparse.Arithmetic.time_arithmetic('csr', 'AA', 1000, True, '__mul__')   |
| -        | 190±0ms                        | 74.5±0ms                          |    0.39 | sparse.Arithmetic.time_arithmetic('csr', 'AB', 1000, False, '__mul__')  |
| -        | 211±0ms                        | 82.1±0ms                          |    0.39 | sparse.Arithmetic.time_arithmetic('csr', 'BA', 1000, False, '__mul__')  |
| -        | 22.9±0ms                       | 8.94±0ms                          |    0.39 | sparse.Arithmetic.time_arithmetic('csr', 'BB', 250, False, '__mul__')   |
| -        | 208±0ms                        | 78.6±0ms                          |    0.38 | sparse.Arithmetic.time_arithmetic('csr', 'BA', 1000, True, '__mul__')   |
| -        | 23.5±0ms                       | 8.05±0ms                          |    0.34 | sparse.Arithmetic.time_arithmetic('coo', 'BB', 250, True, '__mul__')    |
| -        | 23.6±0ms                       | 8.13±0ms                          |    0.34 | sparse.Arithmetic.time_arithmetic('dia', 'BB', 250, True, '__mul__')    |
| -        | 23.2±0ms                       | 7.65±0ms                          |    0.33 | sparse.Arithmetic.time_arithmetic('coo', 'BB', 250, False, '__mul__')   |
| -        | 23.5±0ms                       | 7.71±0ms                          |    0.33 | sparse.Arithmetic.time_arithmetic('csr', 'BB', 250, True, '__mul__')    |
| -        | 401±0ms                        | 123±0ms                           |    0.31 | sparse.Arithmetic.time_arithmetic('csr', 'BB', 1000, True, '__mul__')   |
| -        | 395±0ms                        | 117±0ms                           |    0.3  | sparse.Arithmetic.time_arithmetic('csr', 'BB', 1000, False, '__mul__')  |

Raw output from python dev.py bench --compare:

compare-sparse.Arithmetic.txt
compare-sparse.Conversion.txt
compare-sparse.MatmulLargeCSRPoissonSquared.txt
compare-sparse.MatmulLargeCSRRand.txt
compare-sparse.Matvec.txt
compare-sparse.NullSlice.time_getcol.txt
compare-sparse.Random.txt
compare-sparse.Sort.txt
compare-sparse_linalg_expm.Expm.txt
compare-sparse_linalg_solve.Bench.time_solve.txt
compare-sparse_matrix_power.txt

@alugowski
Copy link
Contributor Author

CI errors seem to be scipy/optimize/tests/test_linprog.py::TestLinprogIPSparse::test_bug_6139. That test passes for me locally, and I can't figure out how it relates to this change. Any insight appreciated!

@lucascolley lucascolley added enhancement A new feature or improvement scipy.sparse needs-decision Items that need further discussion before they are merged or closed C/C++ Items related to the internal C/C++ code base labels Dec 20, 2023
@rgommers
Copy link
Member

Thanks @alugowski, this looks very interesting! The first question that came to mind for me regarding benchmarks/performance is how linear the scaling is with the requested number of threads. E.g., use an array of size 64k x 32, and plot time taken by parallelized operations as a function of the number of threads. The closer to linear the scaling the better. Did you try anything like that already?

CI errors seem to be scipy/optimize/tests/test_linprog.py::TestLinprogIPSparse::test_bug_6139. That test passes for me locally, and I can't figure out how it relates to this change. Any insight appreciated!

I can't reproduce it either, but from the CI log it looks to me like the solution only slightly exceeds the requested atol=1e-8, so this is unlikely to be a significant issue, more a minor tolerance thing for an inherently unstable test.

The CircleCI job shows a build error in csr.cxx, that seems more important to sort out.

@rgommers
Copy link
Member

Re API for parallelism: your use of threadpoolctl sounds like a good idea to me, and is probably enough - given your explanation of why workers isn't a great fit here, I'm not sure what else would help. Perhaps a global switch to switch between single-threaded and completely automatic using all available CPU tools, but that's a pretty blunt tool and we try to avoid global state.

I don't have immediate ideas for further operations, probably best to complete the work on this first (or at least till we have a strategy for getting this merged).

@lucascolley lucascolley removed the needs-decision Items that need further discussion before they are merged or closed label Dec 20, 2023
@alugowski
Copy link
Contributor Author

I'll write some code for a speedup benchmark.

The CircleCI job shows a build error in csr.cxx, that seems more important to sort out.

That'll take some thought, there is no error just "killed". My only guess is a compile unit size issue, which relates to:

I don't have immediate ideas for further operations, probably best to complete the work on this first (or at least till we have a strategy for getting this merged).

I need to clarify. I agree further non-CSR ops would be worth discussing later. I meant that we may have to make a decision on what to turn off in this PR for library size issues. Hence why I have those three groups:

  • always on: matmul and sort_elements. Relatively small compilation penalty and a large bang for the buck.
  • binops: There are many of these, hence parallelizing these nearly doubles _sparsetools.so size. On in the PR.
  • others: all the other embarrassingly parallel ops in csr.h, like to_dense, scale_columns, etc. The PR has parallel code for these, but is turned off in the PR as is only for library size reasons.

I'm hoping someone has some ideas on how to not blow up the library size. My best guess is that having both the parallel and sequential code paths doubles the generated code size, but it's unclear why that would be as both call the same lambdas. Perhaps the compiler inlines too much.

@alugowski
Copy link
Contributor Author

Speedup of tocsr(mostly sort_elements), two binops and a matmul on a random matrix with 5M nonzeros:

random

Of course the sparsity pattern matters a lot with any sparse matrix op. A Poisson 2D grid is often a difficult one. Here:

poisson2d

Part of the reason for worse speedup is that the ops in general are significantly faster. The random matmul takes 9.8s, while the Poisson 2d matmul takes only 0.13s.

The binops are a difficult to decide on. In those plots I re-enabled the parallel versions of the "is_canonical" method, without that the speedup barely touches 1.5x at these sizes. That's still a nice speedup, but is it worth it?

Note that parallel binops start to shine at much larger sizes:

random 50M

Hence the difficult tradeoff decision. Is the extra complexity worth it for only large matrices? I think if only plus/minus were parallelized that might be ok, but I'd also be happy with sequential binops.

Attaching the script I used. Command for first plot:
python dev.py python speedup.py 5_000_000 random plot tocsr add mul matmul

speedup.py.txt

scipy/sparse/__init__.py Outdated Show resolved Hide resolved
@rgommers
Copy link
Member

I noticed the comment for M1 and thought it could cause some of the levelling off; I believe M1's always have 4 efficiency cores, not 2. However, I repeated the benchmarks on a machine with 12 physical cores and it doesn't look better:

image

This is with the default build settings in this PR. Something is wrong here though, these curves should be much closer to linear. A square root type leveling off indicates that there's communication overhead between threads where that is not expected (elementwise ops are the worst here, and they shouldn't have cross-thread data passing), or maybe the data chunk size is chosen too small, or something else like that. It may be worth profiling with something like py-spy to figure out what is going on. If we have C++-level threading without callbacks to Python, I think we should only offer a parallel API with expected (more-or-less linear) scaling up to a reasonable number of threads (e.g., 64).

@rgommers
Copy link
Member

OpenMP is explicitly off the table as per docs, so no omp parallel for. Common libraries like TBB or Taskflow seem too heavy for just this one application, correct me if I'm wrong. That leaves simple threads or a thread pool.

Yes, we're not ready to revisit OpenMP I'm afraid. And yes, TBB & co are a bit too heavy for us.

@alugowski
Copy link
Contributor Author

alugowski commented Dec 21, 2023

I noticed the comment for M1 and thought it could cause some of the levelling off; I believe M1's always have 4 efficiency cores, not 2.

Mine is 6 power + 2 efficiency.

However, I repeated the benchmarks on a machine with 12 physical cores and it doesn't look better:
image

This is with the default build settings in this PR. Something is wrong here though, these curves should be much closer to linear. A square root type leveling off indicates that there's communication overhead between threads where that is not expected (elementwise ops are the worst here, and they shouldn't have cross-thread data passing), or maybe the data chunk size is chosen too small, or something else like that. It may be worth profiling with something like py-spy to figure out what is going on. If we have C++-level threading without callbacks to Python, I think we should only offer a parallel API with expected (more-or-less linear) scaling up to a reasonable number of threads (e.g., 64).

It's simpler than that. The binop is largely a memory scan, so it's dominated by memory bandwidth. One way to show that is to make the base operation more compute intensive so memory no longer dominates. To show that, in csr_elmul_csr() wrap the multiplies<T>() with expensive_op(std::multiplies<T>()):

template <class Op>
class expensive_op {
public:
    explicit expensive_op(Op op): op(op) {}
    using T = typename Op::result_type;

    T operator()(const T& a, const T& b) const {
        int n = (int)op(a, b);
        for (int i = 1; i < 500; ++i) {
            n = (i % 2) ? (3*n + 1) : (n / 2);
        }
        if (n == 0) return 0;
        return op(a, b);
    }

    Op op;
};

Now python dev.py python speedup.py 500_000 random plot add mul shows that the expensified mul does scale as expected:
expensive_op

The weird shape is because p>1 needs to do twice the work, hence no speedup for 2 threads then the slope of ~0.5 is expected. Without that p=1 optimization (add true || to the if to force the parallel path) the scaling is nearly perfect:

expensive_op_p1

So my claim is that the poor scaling is a natural property of the binop operation. The main improvements would be to only scale enough to not over subscribe memory bandwidth. To me it looks like my code works well on very large matrices, maybe 1M or larger. One option would be to limit the parallel version to that size. This code has no inter-thread communication at all (the 2 phases remove that need).

The other kink with binops is that the handler first checks both operands with csr_has_canonical_format (which is also just a memory-dependent scan). I've parallelized that op, but it's in the OTHERS group. I've enabled it in my previous comment, but it's disabled in the PR. Set PARALLEL_OTHERS to 1 in util_par.h to duplicate my plots.

@alugowski
Copy link
Contributor Author

Note about large p, like 32 or 64. Those sizes can uncover some non-obvious sequencing. A big one is allocators, the standard lib's allocator is thread safe but it can become a hotspot on large thread counts. This PR creates as few splits as possible, which should suffer the least. Unfortunately any solutions (like arena allocators or other tricks) are in libraries like TBB.

@alugowski
Copy link
Contributor Author

I've implemented a way to parallelize only some binops, and enabled that for plus, minus, and multiply only. That reduces the compilation time and so I enabled the other parallel ops too. Library size is smaller than before and all ops are parallel.

I also tweaked the parallelization thresholds a bit, now the memory-bound operations have to be larger to be parallelized, around 1M nonzeros. It should basically never be worse now. Sure 2x isn't amazing, but it's not nothing either. Either way, those single-scan methods are fairly fast in absolute terms anyway, so I wouldn't shed a tear if they were excluded.

I think this is generally a good tradeoff. You should no longer see any differences when you run the speedup script. I also added a matvec bench to the script:
updated 5M

Just for context and a sanity check, I've also added a random2d matrix type that creates an ndarray, to see how BLAS scales:

blas 5M
(OpenBLAS on macOS, default from numpy 1.26.2 wheels)

Makes me feel a little better :)

speedup.py.txt

@alugowski
Copy link
Contributor Author

I know I'm a broken record about library size, but even on the main branch binops account for half of _sparsetools.so size (~2MB worth), and about 5% of overall python dev.py build runtime. These things are important so I'm trying to be upfront about it.

@alugowski
Copy link
Contributor Author

I've simplified the PR based on feedback:

The parallelism utilities are broken down as:

  • util_par_algorithms.h: everything needed to parallelize any for loop. Just convert the loop to use std::for_each and use the provided threaded execution policy: std::for_each(util_par::threaded(4), ...)
  • util_par_config.h: configures the remembered worker count (set by threadpoolctl controller) and supplies a par_if method to decide between parallel and sequential based on datastructure size.

The PR looks big mostly because of the signature change in matmul that propagates through several layers. The "changes" in BSR are simply restoring the old pre-parallel CSR matmul_maxnnz that was reused by BSR.

@rgommers
Copy link
Member

Thanks for adding a lot of detail here @alugowski. Looks like you're right about the scaling and memory bandwidth indeed.

I've simplified the PR based on feedback:

These changes make sense I think, and should make it easier to get this merged since the wins are clearer without much of a downside (beyond a bit more code complexity of course).

Looks like there's a crash in _mul_sparse_matrix in the Windows job that builds sdist/wheel (so uses -O3, unlike the dev.py builds).

@rgommers
Copy link
Member

rgommers commented Jan 4, 2024

@alugowski one other question: did you consider fork-safety of the threadpool? The pocketfft threadpool implementation uses pthread_atfork shutdown/restart to handle this (hat tip to @peterbell10 for pointing this out).

To be more precise - this is needed because multiprocessing is doing fork without exec, after threads may already be created. This is the reason why we can't use OpenMP (xref gh-10239). More CPython's fault than OpenMP's probably (see python/cpython#84559), but it is what it is so we need to be robust against this.

`exclusive_scan` was helpful to compute `Cp` in several places, but with no parallel binops that's no longer true.
@alugowski
Copy link
Contributor Author

@alugowski one other question: did you consider fork-safety of the threadpool? The pocketfft threadpool implementation uses pthread_atfork shutdown/restart to handle this (hat tip to @peterbell10 for pointing this out).

To be more precise - this is needed because multiprocessing is doing fork without exec, after threads may already be created. This is the reason why we can't use OpenMP (xref gh-10239). More CPython's fault than OpenMP's probably (see python/cpython#84559), but it is what it is so we need to be robust against this.

Good question. There is no longer a thread pool, and no mutexes or other fork-unfriendly constructs. Since fork() only carries over the calling thread to the child, I believe there is nothing to handle.

The original PR had an optional thread pool, but had to be manually enabled with USE_POOL=1 else the default fork-join model was used. Not fork(), just starting/joining threads in each parallel function call. All the performance analytics we talked about above are with this model. I saw negligible performance benefits from the pool, just more code, so I removed that unused code about a week ago.

All threading logic is in this one function:

/*
* Chunk the range [first, last) according to the execution policy's
* thread count, and spawn threads for each chunk. The last chunk
* (or only chunk if single-threaded) is executed in the calling thread.
*/
template <class RandIt, class Chunk, typename... A>
void parallel_chunk_for(util_par::threaded &&policy,
RandIt first, RandIt last,
Chunk chunk, A&&... chunk_args) {
std::vector<std::thread> threads;
// Calculate number of iterations per chunk
std::size_t num_steps = (std::size_t)std::distance(first, last);
unsigned int p = policy.get_num_threads();
std::size_t chunk_size = (num_steps / p) + ((num_steps % p) > 0 ? 1 : 0);
while (first != last) {
RandIt loop_end = first + std::min(chunk_size, (std::size_t)std::distance(first, last));
if (loop_end == last) {
// execute the last (or only) chunk in the calling thread
chunk(first, loop_end, std::forward<A>(chunk_args)...);
} else {
threads.emplace_back(std::thread(std::forward<Chunk>(chunk),
first, loop_end,
std::forward<A>(chunk_args)...));
}
first = loop_end;
}
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
}
}

@alugowski
Copy link
Contributor Author

alugowski commented Jan 5, 2024

Looks like there's a crash in _mul_sparse_matrix in the Windows job that builds sdist/wheel (so uses -O3, unlike the dev.py builds).

The last two test fails were true headscratchers. Appologies it took so long to figure them out. The MSVC fail was caused by my use of std::exclusive_scan to compute Cp in csr_tocsc(), replacing a manual loop. I can't explain why this causes a crash there, but it's not a needed change so I just reverted it (made the PR slightly smaller, too).

The gcc8 workflow fail is even more weird. I spent a lot of time trying to figure out what's different about GCC 8, and why that one test which also didn't use any sparse methods at all. But the issue wasn't the compiler, but that pytest fixture in test_base.py. Again this makes no sense to me, since the tests are not related at all, test_base is run much later than the failing test, and solver success shouldn't be affected anyway. But moving the fixture solved the issue. The gcc8 workflow uses a different way to run tests (not using python dev.py tests). I'm using a pytest fixture to run all tests both threaded and sequentially.

Now CircleCI is complaining about a missing doc_requirements.txt, which is something this PR does not touch.

@lucascolley
Copy link
Member

Now CircleCI is complaining about a missing doc_requirements.txt, which is something this PR does not touch.

Merging main will fix this, the file was recently moved to requirements/doc.txt instead.

@thomasjpfan
Copy link
Contributor

If C++ parallel has a wait policy, it could end up stalling when switching between different parallel contexts: OpenMathLib/OpenBLAS#3187 and OpenMathLib/OpenBLAS#3187 (comment)

In the issue, the example switches between OpenBLAS and OpenMP parallel contexts and a threadpool's wait policy causes the program to slow down.

@alugowski
Copy link
Contributor Author

If C++ parallel has a wait policy, it could end up stalling when switching between different parallel contexts: OpenMathLib/OpenBLAS#3187 and OpenMathLib/OpenBLAS#3187 (comment)

In the issue, the example switches between OpenBLAS and OpenMP parallel contexts and a threadpool's wait policy causes the program to slow down.

The linked comment says the issue is that OpenMP uses active waiting in its own thread pool, meaning OpenMP worker threads spin waiting for work. This crowds out any other workloads. There's nothing to be done about that, besides using OpenMP yourself. The parallel code in this PR does not actively wait, and neither did the thread pool that was originally in the PR.

*/
template <class ExecPolicy, class RandIt, class UnaryFunc>
util_par::internal::enable_if_threaded<ExecPolicy, void>
for_each(ExecPolicy &&policy, RandIt first, RandIt last, UnaryFunc f) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just call this util_par::for_each to avoid confusion.

util_par::internal::parallel_chunk_for(
std::forward<ExecPolicy>(policy),
first, last,
std::for_each<RandIt, UnaryFunc>, f);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking pointers to standard library templates like this not allowed, you should wrap it in a lambda.

} else {
threads.emplace_back(std::thread(std::forward<Chunk>(chunk),
first, loop_end,
std::forward<A>(chunk_args)...));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you ever move from chunk_args, then only one thread will see a valid argument.

template <class RandIt, class Chunk, typename... A>
void parallel_chunk_for(util_par::threaded &&policy,
RandIt first, RandIt last,
Chunk chunk, A&&... chunk_args) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If binary size is a concern, a standard technique would be to have a type-erased version which can execute either in serial or parallel. This prevents the implementation being inlined into the serial case and means the std::thread code is never inlined anywhere.

e.g. something like

using loop_fn_t = void (void*, int64_t, int64_t);  // context, begin, end
void parrallel_for(int64_t num_threads, int64_t num_steps, void* context, loop_fn_t * loop_fn); 

OutputIt inclusive_scan(InputIt first, InputIt last, OutputIt result)
{
#if (!defined(_GLIBCXX_RELEASE) || _GLIBCXX_RELEASE >= 9)
return std::inclusive_scan(first, last, result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know what method the standard libraries use to implement the parallel algorithms and if they might introduce any issues with fork?

std::vector<I>(n_col, -1), // mask
std::vector<T>(n_col, 0)); // sums
}, [&](I i, auto& workspace){
auto& [mask, sums] = workspace;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you change the signature to (I first, I last) you could iterate over the whole chunk at once, avoiding the workspace issue and allowing smarter algorithms such as computing the per thread nnz as you go and summing the per-thread values at the end. This would avoid you having to loop over C_row_nnz twice.

Comment on lines +12 to +13
int num_workers = 1;
// int num_workers = std::min(16, std::thread::hardware_concurrency());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, thanks for doing this! It'd be really great to have this.

But why set the default number of threads to 1 here? In other libraries (numpy (or at least, the BLAS backend),dask, numba, etc.) the default number of threads to be used tends to be based on the local hardware.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite. NumPy, SciPy, scikit-learn are all single-threaded by default, and SciPy/scikit-learn have keywords to switch to multiple threads. The only exception is BLAS/LAPACK calls, and that's because it's not possible to control at runtime in a reasonable manner (and exact threading depends on build settings and env vars). I've been in some discussions with pandas devs as well, where the majority opinion was that single-threaded is the correct default. If you control the whole stack with your own framework (e.g., PyTorch/TF/JAX), it's a much easier story, but that doesn't apply here.

Using all threads by default seems logical at first sight, but you find soon enough that it breaks things when there's higher-level parallelism (e.g., Dask, multiprocessing).

It'd be great to have better coordination on parallelism control, but it's tough. See https://thomasjpfan.github.io/parallelism-python-libraries-design/ for a good summary of where we are at and what the options are to improve.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the late response!

NumPy, SciPy, scikit-learn are all single-threaded by default

I think this is true on the python code level, but in practice when I run something like scikit learns dense PCA I will be using many threads by default, since matrix multiplication is multithreaded by default with pypi and conda numpy distributions.

The only exception is BLAS/LAPACK calls, and that's because it's not possible to control at runtime in a reasonable manner

Scikitlearn, via joblib and threadpoolctl do control the threading of BLAS/ LAPACK calls to avoid over subscription when their higher level parallelism is used.

I've been in some discussions with pandas devs as well, where the majority opinion was that single-threaded is the correct default.

I would be curious how this interacts with their increasing use of arrow via pyarrow, since pyarrow's compute and IO is multithreaded by default.

Using all threads by default seems logical at first sight, but you find soon enough that it breaks things when there's higher-level parallelism (e.g., Dask, multiprocessing).

If we are requiring some users to have to think more about parallelism, I think it's more reasonable to have those users be the ones using a higher level parallelism. I believe so much code is highly multithreaded by default in the scientific python ecosystem (pyarrow, polars, many things using numba, numpy's dot product, etc.) that I think people who want to use higher level parallelism already need to take steps to mitigate oversubscription.

I think one of the nice things this PR does is hook into some nice tooling for higher level parallelism. E.g. if this code is called within a jobllib Parallel job it will automatically scale the number of threads used via threadpoolctl (thanks to @alugowski's upstream improvements!).

Dask also exerts control over the number of BLAS threads each job has access to, though I believe it's a different mechanism.

User expectations

I think it would be unintuitive if a @ b was multithreaded for dense arrays and single threaded for sparse arrays by default.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C/C++ Items related to the internal C/C++ code base enhancement A new feature or improvement scipy.sparse
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ENH: sparse.csr_matrix: parallelize dot product
6 participants