Skip to content

Commit

Permalink
[oneDPL][ranges][merge] support size limit for output; + draft for me…
Browse files Browse the repository at this point in the history
…rge path for the host backend (__pattern_merge_20
  • Loading branch information
MikeDvorskiy committed Nov 27, 2024
1 parent 85032a7 commit 33cd332
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 92 deletions.
141 changes: 140 additions & 1 deletion include/oneapi/dpl/pstl/algorithm_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,40 @@ __pattern_remove_if(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec,
// merge
//------------------------------------------------------------------------

template<std::random_access_iterator It1, std::random_access_iterator It2, std::random_access_iterator ItOut, typename _Comp>
std::pair<It1, It2>
__brick_merge(It1 __it_1, It1 __it_1_e, It2 __it_2, It2 __it_2_e, ItOut __it_out, ItOut __it_out_e, _Comp __comp)
{
while(__it_1 != __it_1_e && __it_2 != __it_2_e)
{
if (__comp(*__it_1, *__it_2))
{
*__it_out = *__it_1;
++__it_out, ++__it_1;
}
else
{
*__it_out = *__it_2;
++__it_out, ++__it_2;
}
if(__it_out == __it_out_e)
return {__it_1, __it_2};
}

if(__it_1 == __it_1_e)
{
for(; __it_2 != __it_2_e && __it_out != __it_out_e; ++__it_2, ++__it_out)
*__it_out = *__it_2;
}
else
{
//assert(__it_2 == __it_2_e);
for(; __it_1 != __it_1_e && __it_out != __it_out_e; ++__it_1, ++__it_out)
*__it_out = *__it_1;
}
return {__it_1, __it_2};
}

template <class _ForwardIterator1, class _ForwardIterator2, class _OutputIterator, class _Compare>
_OutputIterator
__brick_merge(_ForwardIterator1 __first1, _ForwardIterator1 __last1, _ForwardIterator2 __first2,
Expand Down Expand Up @@ -2980,13 +3014,111 @@ __pattern_merge(_Tag, _ExecutionPolicy&&, _ForwardIterator1 __first1, _ForwardIt
typename _Tag::__is_vector{});
}

template<class ForwardIt, class T = typename std::iterator_traits<ForwardIt>::value_type,
class Compare>
ForwardIt lower_bound_2(ForwardIt first, ForwardIt last, const T& value, Compare comp)
{
ForwardIt it;
typename std::iterator_traits<ForwardIt>::difference_type count, step;
count = std::distance(first, last);

while (count > 0)
{
it = first;
step = count / 2;
std::advance(it, step);

std::cout << "it: " << *it << " ";
if (comp(*it, value))
{
first = ++it;
count -= step + 1;
}
else
count = step;
}

std::cout << "first: " << *first << " ";
std::cout << std::endl;
return first;
}

template<typename _IsVector, typename _ExecutionPolicy, typename _It1, typename _Index1, typename _It2,
typename _Index2, typename _OutIt, typename _Index3, typename _Comp>
std::pair<_It1, _It2>
__pattern_merge_2(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _It1 __it_1, _Index1 __n_1, _It2 __it_2,
_Index2 __n_2, _OutIt __it_out, _Index3 __n_out, _Comp __comp)
{
using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag;

_It1 __it_res_1;
_It2 __it_res_2;

__internal::__except_handler([&]() {
__par_backend::__parallel_for(__backend_tag{}, std::forward<_ExecutionPolicy>(__exec), _Index3(0), __n_out,
[=, &__it_res_1, &__it_res_2](_Index3 __i, _Index3 __j)
{
//a start merging point on the merge path; for each thread
_Index1 __r = 0; //row index
_Index2 __c = 0; //column index

if(__i > 0)
{
//calc merge path intersection:
const _Index3 __d_size = std::abs(std::max<_Index2>(0, __i - __n_2) - (std::min<_Index1>(__i, __n_1) - 1)) + 1;

auto __get_row = [__i, __n_1](auto __d) { return std::min<_Index1>(__i, __n_1) - __d - 1; };
auto __get_column = [__i, __n_1](auto __d) { return std::max<_Index1>(0, __i - __n_1 - 1) + __d + (__i / (__n_1 + 1) > 0 ? 1 : 0); };

oneapi::dpl::counting_iterator<_Index3> __it_d(0);

auto __res_d = *std::lower_bound(__it_d, __it_d + __d_size, 1,
[&](auto __d, auto __val) {
auto __r = __get_row(__d);
auto __c = __get_column(__d);

oneapi::dpl::__internal::__compare<_Comp, std::identity> __cmp{__comp, std::identity{}};
const auto __res = (__cmp(__it_1[__r], __it_2[__c]) ? 1 : 0);

return __res < __val;
}
);

//intersection point
__r = __get_row(__res_d);
__c = __get_column(__res_d);
++__r; //to get a merge matrix ceil, lying on the current diagonal
}

//serial merge n elements, starting from input x and y, to [i, j) output range
//reverse range1 and range 2 to keep merge stability - range 1 should be merged first
auto __res = __brick_merge(__it_1 + __r, __it_1 + __n_1,
__it_2 + __c, __it_2 + __n_2,
__it_out + __i, __it_out + __j, __comp);

//for(int k = __i; k < __j; ++k)
// std::cout << __it_out[k] << " ";
//std::cout << std::endl;

if(__j == __n_out)
{
__it_res_1 = __res.first;
__it_res_2 = __res.second;
}
}, /*_ONEDPL_MERGE_CUT_OFF*/3);
});

return {__it_res_1, __it_res_2};
}

template <class _IsVector, class _ExecutionPolicy, class _RandomAccessIterator1, class _RandomAccessIterator2,
class _RandomAccessIterator3, class _Compare>
_RandomAccessIterator3
__pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _RandomAccessIterator1 __first1,
__pattern_merge(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec, _RandomAccessIterator1 __first1,
_RandomAccessIterator1 __last1, _RandomAccessIterator2 __first2, _RandomAccessIterator2 __last2,
_RandomAccessIterator3 __d_first, _Compare __comp)
{
#if 0
using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag;

return __internal::__except_handler([&]() {
Expand All @@ -2999,6 +3131,13 @@ __pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _RandomAcc
});
return __d_first + (__last1 - __first1) + (__last2 - __first2);
});
#else
auto __n_1 = __last1 - __first1;
auto __n_2 = __last2 - __first2;
auto __n_3 = __n_1 + __n_2;
__pattern_merge_2(__tag, std::forward<_ExecutionPolicy>(__exec), __first1, __n_1, __first2, __n_2, __d_first, __n_3, __comp);
return __d_first + __n_3;
#endif
}

//------------------------------------------------------------------------
Expand Down
92 changes: 3 additions & 89 deletions include/oneapi/dpl/pstl/algorithm_ranges_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,48 +465,12 @@ __pattern_merge(_Tag __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _
return __return_type{std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2) + std::ranges::size(__r2), __res};
}

template<std::random_access_iterator It1, std::random_access_iterator It2, std::random_access_iterator ItOut, typename _Comp>
std::pair<It1, It2>
__brick_merge(It1 __it_1, It1 __it_1_e, It2 __it_2, It2 __it_2_e, ItOut __it_out, ItOut __it_out_e, _Comp __comp)
{
while(__it_1 != __it_1_e && __it_2 != __it_2_e)
{
if (__comp(*__it_1, *__it_2))
{
*__it_out = *__it_1;
++__it_out, ++__it_1;
}
else
{
*__it_out = *__it_2;
++__it_out, ++__it_2;
}
if(__it_out == __it_out_e)
return {__it_1, __it_2};
}

if(__it_1 == __it_1_e)
{
for(; __it_2 != __it_2_e && __it_out != __it_out_e; ++__it_2, ++__it_out)
*__it_out = *__it_2;
}
else
{
//assert(__it_2 == __it_2_e);
for(; __it_1 != __it_1_e && __it_out != __it_out_e; ++__it_1, ++__it_out)
*__it_out = *__it_1;
}
return {__it_1, __it_2};
}

template<typename _IsVector, typename _ExecutionPolicy, typename _R1, typename _R2, typename _OutRange, typename _Comp,
typename _Proj1, typename _Proj2>
auto
__pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp,
__pattern_merge(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp,
_Proj1 __proj1, _Proj2 __proj2)
{
using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag;

auto __comp_2 = [__comp, __proj1, __proj2](auto&& __val1, auto&& __val2) { return std::invoke(__comp,
std::invoke(__proj1, std::forward<decltype(__val1)>(__val1)), std::invoke(__proj2,
std::forward<decltype(__val2)>(__val2)));};
Expand All @@ -517,68 +481,18 @@ __pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _R1&& __r1

_Index1 __n_1 = std::ranges::size(__r1);
_Index2 __n_2 = std::ranges::size(__r2);

_Index3 __n_out = std::min<_Index3>(__n_1 + __n_2, std::ranges::size(__out_r));

auto __it_1 = std::ranges::begin(__r1);
auto __it_2 = std::ranges::begin(__r2);
auto __it_out = std::ranges::begin(__out_r);

std::ranges::borrowed_iterator_t<_R1> __it_res_1;
std::ranges::borrowed_iterator_t<_R1> __it_res_2;

__internal::__except_handler([&]() {
__par_backend::__parallel_for(__backend_tag{}, ::std::forward<_ExecutionPolicy>(__exec), _Index3(0), __n_out,
[=, &__r1, &__r2, &__out_r, &__it_res_1, &__it_res_2](_Index3 __i, _Index3 __j)
{/*...*/

//a start merging point on the merge path; for each thread
std::ranges::range_difference_t<_R1> __x = 0;
std::ranges::range_difference_t<_R2> __y = 0;

if(__i > 0)
{
//calc merge path intersection:
const _Index3 __d_size = std::abs(std::max<_Index2>(0, __i - __n_2) - (std::min<_Index1>(__i, __n_1) - 1)) + 1;

auto __get_x = [__i, __n_1](auto __d) { return std::min<_Index1>(__i, __n_1) - __d; };
auto __get_y = [__i, __n_1](auto __d) { return std::max<_Index1>(0, __i - __n_1) + __d; };

oneapi::dpl::counting_iterator<_Index3> __it_d(0);
auto __res_d = *std::lower_bound(__it_d, __it_d + __d_size, 1,
[&](auto __d, auto __val) {
auto __x = __get_x(__d);
auto __y = __get_y(__d);

const auto __res = __comp_2(__r1[__x], __r2[__y]) ? 0 : 1;
return __res < __val;
}
);
//intersection point
__x = __get_x(__res_d);
__y = __get_y(__res_d);

}

const _Index3 __n = __j - __i;

//serial merge n elements, starting from input x and y, to [i, j) output range
auto __res = __brick_merge(__it_1 + __x, __it_1 + __n_1,
__it_2 + __y, __it_2 + __n_2,
__it_out + __i, __it_out + __j, __comp_2);

if(__j == __n_out)
{
__it_res_1 = __res.first;
__it_res_2 = __res.second;
}
});
});
auto __res = __pattern_merge_2(__tag, std::forward<_ExecutionPolicy>(__exec), __it_1, __n_1, __it_2, __n_2, __it_out, __n_out, __comp_2);

using __return_type = std::ranges::merge_result<std::ranges::borrowed_iterator_t<_R1>, std::ranges::borrowed_iterator_t<_R2>,
std::ranges::borrowed_iterator_t<_OutRange>>;

return __return_type{__it_res_1, __it_res_2, std::ranges::begin(__out_r) + __n_out};
return __return_type{__res.first, __res.second, __it_out + __n_out};
}

//TODO:
Expand Down
5 changes: 3 additions & 2 deletions include/oneapi/dpl/pstl/parallel_backend_tbb.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ class __parallel_for_body
// wrapper over tbb::parallel_for
template <class _ExecutionPolicy, class _Index, class _Fp>
void
__parallel_for(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f)
__parallel_for(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f,
std::size_t __grainsize = 1)
{
tbb::this_task_arena::isolate([=]() {
tbb::parallel_for(tbb::blocked_range<_Index>(__first, __last), __parallel_for_body<_Index, _Fp>(__f));
tbb::parallel_for(tbb::blocked_range<_Index>(__first, __last, __grainsize), __parallel_for_body<_Index, _Fp>(__f));
});
}

Expand Down

0 comments on commit 33cd332

Please sign in to comment.