diff --git a/kokkos-testing/fd_catalog/CMakeLists.txt b/kokkos-testing/fd_catalog/CMakeLists.txt index 1927a55d9f6253569f711e3e1efb13d1eaa03755..80f18c11424046d6dff9a4f80484bb1212cb59c9 100644 --- a/kokkos-testing/fd_catalog/CMakeLists.txt +++ b/kokkos-testing/fd_catalog/CMakeLists.txt @@ -126,7 +126,7 @@ endif() set(CMAKE_CXX_FLAGS_DEBUG "-g") -set(CMAKE_CXX_FLAGS_RELEASE "-O3 -march=native -g -DNDEBUG") +set(CMAKE_CXX_FLAGS_RELEASE "-O3 -march=native -fopenmp -g -DNDEBUG") set(CMAKE_CXX_FLAGS_RELEASEASSERT "-O3 -march=native -g -fpermissive") macro(setup_fd_catalog_target FD_TGT_NAME SINGLE_PRECISION) diff --git a/kokkos-testing/fd_catalog/fd_main.cpp b/kokkos-testing/fd_catalog/fd_main.cpp index 0f1a8c1a6f38952dcacb28be665f6e303553e5d2..584cf0114b70a6e261902330e7aef4af6827cd99 100644 --- a/kokkos-testing/fd_catalog/fd_main.cpp +++ b/kokkos-testing/fd_catalog/fd_main.cpp @@ -56,9 +56,9 @@ int main(int argc, char **argv) time = solve_sequential(wec); ofs << time << " "; - wec.init(); - time = solve_sequential_blocked(wec); - ofs << time << " "; + //wec.init(); + //time = solve_sequential_blocked(wec); + //ofs << time << " "; #ifdef HAVE_CUDA wec.init(); diff --git a/kokkos-testing/fd_catalog/fd_wave_cpu.hpp b/kokkos-testing/fd_catalog/fd_wave_cpu.hpp index 120620ba98cecf031cb7324651084420dc4337b9..c13ea4666e28d8a7e07ece9103ed706e08043117 100644 --- a/kokkos-testing/fd_catalog/fd_wave_cpu.hpp +++ b/kokkos-testing/fd_catalog/fd_wave_cpu.hpp @@ -378,6 +378,7 @@ wave_2D_kernel(const fd_grid<T>& g_prev, const fd_grid<T>& g_curr, T one_minus_adt = (1.0 - a*dt); T two_minus_adt = (2.0 - a*dt); + //#pragma omp parallel for for (size_t i = from; i < maxrow; i+=to) { #pragma clang loop vectorize(enable) @@ -527,6 +528,88 @@ public: }; + + +class Barrier { +public: + explicit Barrier(std::size_t iCount) : + mThreshold(iCount), + mCount(iCount), + mGeneration(0) { + } + + void Wait() { + std::unique_lock<std::mutex> lLock{mMutex}; + auto lGen = mGeneration; + if (!--mCount) { + mGeneration++; + mCount = mThreshold; + mCond.notify_all(); + } else { + mCond.wait(lLock, [this, lGen] { return lGen != mGeneration; }); + } + } + +private: + std::mutex mMutex; + std::condition_variable mCond; + std::size_t mThreshold; + std::size_t mCount; + std::size_t mGeneration; +}; + +/* +class Barrier + { + private: + std::mutex m_mutex; + std::condition_variable m_cv; + + size_t m_count; + const size_t m_initial; + + enum State : unsigned char { + Up, Down + }; + State m_state; + + public: + explicit Barrier(std::size_t count) : m_count{ count }, m_initial{ count }, m_state{ State::Down } { } + + /// Blocks until all N threads reach here + void Wait() + { + std::unique_lock<std::mutex> lock{ m_mutex }; + + if (m_state == State::Down) + { + // Counting down the number of syncing threads + if (--m_count == 0) { + m_state = State::Up; + m_cv.notify_all(); + } + else { + m_cv.wait(lock, [this] { return m_state == State::Up; }); + } + } + + else // (m_state == State::Up) + { + // Counting back up for Auto reset + if (++m_count == m_initial) { + m_state = State::Down; + m_cv.notify_all(); + } + else { + m_cv.wait(lock, [this] { return m_state == State::Down; }); + } + } + } + }; +*/ + + + #define USE_SPINLOCK template<typename T> double solve_multithread(wave_equation_context<T>& wec, size_t nths) @@ -539,6 +622,7 @@ double solve_multithread(wave_equation_context<T>& wec, size_t nths) params.velocity = wec.velocity; params.damping = wec.damping; + Barrier pb(nths+1), cb(nths+1); /* Multithreading stuff */ #ifdef USE_SPINLOCK @@ -561,13 +645,7 @@ double solve_multithread(wave_equation_context<T>& wec, size_t nths) while (1) { #ifdef USE_SPINLOCK - splock.lock(); - int done = thread_done[thread_id]; - splock.unlock(); - - if (done) - continue; - + pb.Wait(); if (iteration_finished) return; #else @@ -590,16 +668,15 @@ double solve_multithread(wave_equation_context<T>& wec, size_t nths) /* Work for this thread finished, notify producer */ #ifdef USE_SPINLOCK - splock.lock(); - thread_done[thread_id] = 1; times[thread_id] += ms.count(); - splock.unlock(); + cb.Wait(); #else std::unique_lock<std::mutex> lck(cv_mtx); prod_cv.notify_one(); thread_done[thread_id] = 1; times[thread_id] += ms.count(); #endif /* USE_SPINLOCK */ + } }; @@ -617,23 +694,8 @@ double solve_multithread(wave_equation_context<T>& wec, size_t nths) auto start = std::chrono::high_resolution_clock::now(); #ifdef USE_SPINLOCK - splock.lock(); - for (auto& td : thread_done) - td = 0; - splock.unlock(); - - while(1) - { - int ttd = 0; - - splock.lock(); - for (auto& td : thread_done) - ttd += td; - splock.unlock(); - - if (ttd == nths) - break; - } + pb.Wait(); + cb.Wait(); #else std::unique_lock<std::mutex> lck(cv_mtx); /* Mark data ready and start the threads */ @@ -645,6 +707,7 @@ double solve_multithread(wave_equation_context<T>& wec, size_t nths) prod_cv.wait(lck); #endif /* USE_SPINLOCK */ + auto stop = std::chrono::high_resolution_clock::now(); std::chrono::duration<double, std::milli> ms = stop - start; time += ms.count(); @@ -666,11 +729,9 @@ double solve_multithread(wave_equation_context<T>& wec, size_t nths) /* Tell all the threads to stop */ #ifdef USE_SPINLOCK - splock.lock(); - for (size_t i = 0; i < nths; i++) - thread_done[i] = 0; iteration_finished = true; - splock.unlock(); + pb.Wait(); + #else { std::unique_lock<std::mutex> lck(cv_mtx);