İş parçacığı güvenli kuyruğundaki monitör nesnesindeki gönderi hatası düzeltildi

Saberie

Active member
“Yazılım Mimarisinde Kalıplar: Nesne İzleme” başlıklı son makalemde, bir iş parçacığı güvenli kuyruğu uygulamıştım. İki ciddi hata yaptım. Üzgünüm Bugün bu hataları düzelteceğim.

Duyuru








Rainer Grimm, uzun yıllardır yazılım mimarı, ekip lideri ve eğitim yöneticisi olarak çalışmaktadır. C++, Python ve Haskell programlama dilleri üzerine makaleler yazmaktan hoşlanır, aynı zamanda sık sık uzmanlık konferanslarında konuşmaktan da keyif alır. Modernes C++ blogunda yoğun bir şekilde C++ tutkusundan bahsediyor.













Bağlamı anlamak için, önce son gönderimin hatalı uygulamasını yeniden göndermeme izin verin.


// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
void lock() const {
monitMutex.lock();
}

void unlock() const {
monitMutex.unlock();
}

void notify_one() const noexcept {
monitCond.notify_one();
}

template <typename Predicate>
void wait(Predicate pred) const { // (10)
std::unique_lock<std::mutex> monitLock(monitMutex);
monitCond.wait(monitLock, pred);
}

private:
mutable std::mutex monitMutex;
mutable std::condition_variable monitCond;
};

template <typename T> // (1)
class ThreadSafeQueue: public Monitor {
public:
void add(T val){
lock();
myQueue.push(val); // (6)
unlock();
notify_one();
}

T get(){
wait( [this] { return ! myQueue.empty(); } ); // (2)
lock();
auto val = myQueue.front(); // (4)
myQueue.pop(); // (5)
unlock();
return val;
}

private:
std::queue<T> myQueue; // (3)
};


class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};


int main(){

std::cout << 'n';

constexpr auto NumberThreads = 100;

ThreadSafeQueue<int> safeQueue; // (7)

auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8)
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9)

std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);

for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();

std::cout << "nn";

}


Örneğin ana fikri, monitör nesnesinin bir sınıf içinde kapsüllenmiş olması ve dolayısıyla yeniden kullanılabilmesidir. Sınıf Monitor birini kullan std::mutex monitörün engellenmesi gibi std::condition_variable monitörün durumu olarak. Sınıf Monitor bir izleme nesnesinin desteklemesi gereken minimum arabirimi sağlar.

ThreadSafeQueue (1)’de genişletilmiştir. std::queue iş parçacığı güvenli bir arayüze sahip olmak. ThreadSafeQueue sınıftan geliyor Monitor ve üye işlevlerini senkronize etmek için üye işlevlerini kullanın add VE get desteklemek. Üyelerin işlevleri add VE get monitör nesnesini sabitlemek için monitör kilidini kullanın. Bu özellikle iş parçacığı kasası olmayanlar için geçerlidir. myQueue. add yeni bir öğe geldiğinde bekleyen ileti dizisini bilgilendir myQueue Eklendi. Bu bildirim iş parçacığı güvenlidir. üye işlevi get (3) daha fazla ilgiyi hak ediyor. Önce wait– Temel koşul değişkeninin çağrılan üye işlevi. Bu wait-call, kayıp ve sahte uyandırmalara karşı koruma sağlamak için ek bir yükleme ihtiyaç duyar (C++ Temel Yönergeleri: Koşul değişkenlerinin tehlikelerinin farkında olun). değiştirmek için yapılan işlemler myQueue (4) ve (5) de aramayı yönettikleri için korunmalıdır. myQueue.push(val) (6) örtüşebilir. monitör nesnesi safeQueue (7) senkronize edilmiş dosyadan bir sayı almak için (8) ve (9)’daki lambda fonksiyonlarını kullanın safeQueue Ekle veya sil. ThreadSafeQueue kendisi bir sınıf şablonudur ve herhangi bir türden değer alabilir. Yüz müşteri ekliyor safeQueue 1 ile 6 arasında 100 rasgele sayı (satır 7), yüz müşteri bu 100 sayıyı aynı anda safeQueue KALDIRILDI. Program çıktısı, iş parçacığı numaralarını ve kimliklerini gösterir.








Bu programın iki ciddi sorunu var. Dietmar Kuehl VE Frank Birbacher sorunları bir e-postada açıkladılar. İşte onun sözleri. Yorumlarım italik ve kalın olarak yazılmıştır.

Duyuru

  1. İçinde ThreadSafeQueue::get() vasıtasıyla olacak Monitor::wait() eğer test edildi myQueue bir öğe içerir veya bir öğenin bulunmasını bekler. Ancak, blok sadece iç mekanda olacak wait() tutuldu, yani içinde get() nesnenin hala içeride olduğundan emin olamazsın myQueue şudur: başka bir iş parçacığı kilidi alabilir ve öğeyi kaldırabilir, bu da myQueue.front() çağrılırken tanımsız davranışa neden olur.
  2. Kopyala/taşı yapıcısı ise T bir istisna atar ThreadSafeQueue tutarsız bir durumda: hiçbir üye işlevi etkin değil, ancak muteks kilitli.
Düzeltme şu ki Monitor::wait() yalnızca şu durumlarda çağrılabilir: unique_lock Düzenlendi. Bu, örneğin, uygun bir nesne ve buna bir referans döndüren karşılık gelen (korumalı?) bir işlev sağlayan monitör tarafından elde edilebilir. wait() gereklilikler:


struct Monitor {
using Lock = std::unique_lock<std::mutex>; // could be wrapper if you prefer
[[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
template <typename Predicate>
void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
// …
};

template <typename T>
T ThreadSafeQueue<T>::get() {
auto kerberos = receiveGuard();
wait(kerberos, [this]{ return not myQueue.empty(); });
T rc = std::move(myQueue.front());
myqueue.pop();
return rc;
}


Bu sürüm, istisna sorununu giderir get(). İçin add() monitör nesnesini yalnızca bir lock_guard kullanım:


template <typename T>
void add(T val) {
{
std::lock_guard<Monitor> kerberos(*this);
myqueue.push(std::move(val));
}
notify_one();
}


Muhtemelen bildirimi “SendGuardsarın şunu lock_guard ve bir referans condition_variable imha bildirimini içerir ve gönderir:


class SendGuard {
friend class Monitor;
using deleter = decltype([](auto& cond){ cond->notify_one(); });
std::unique_ptr<std::condition_variable, deleter> notifier;
std::lock_guard<std::mutex> kerberos;
SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};


Hareket oluşturucu ve yok edici sabit olmalıdır public olun ve tüm arayüzü temsil edin! Bu kullanım olurdu add() hatta çok daha kolay:


template <typename T>
void add(T val) {
auto kerberos = sendGuard();
myqueue.push(val);
}


Son olarak, işte Dietmar’ın tam uygulaması. Rakamlar benimkindeki rakamlarla eşleşiyor monitorObjec.cpp Örnek.


// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
using Lock = std::unique_lock<std::mutex>;
[[nodiscard]] Lock receiveGuard() {
return Lock(monitMutex);
}

template <typename Predicate>
void wait(Lock& kerberos, Predicate pred) {
monitCond.wait(kerberos, pred);
}

class SendGuard {
friend class Monitor;
using deleter = decltype([](auto* cond){ cond->notify_one(); });
std::unique_ptr<std::condition_variable, deleter> notifier;
std::lock_guard<std::mutex> kerberos;
SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};

SendGuard sendGuard() { return {monitMutex, monitCond}; }

private:
mutable std::mutex monitMutex;
mutable std::condition_variable monitCond;
};

template <typename T> // (1)
class ThreadSafeQueue: public Monitor {
public:
void add(T val){
auto kerberos = sendGuard();
myQueue.push(val); // (6)
}

T get(){
auto kerberos = receiveGuard();
wait(kerberos, [this] { return ! myQueue.empty(); } ); // (2)
auto val = myQueue.front(); // (4)
myQueue.pop(); // (5)
return val;
}

private:
std::queue<T> myQueue; // (3)
};


class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};


int main(){

std::cout << 'n';

constexpr auto NumberThreads = 100;

ThreadSafeQueue<int> safeQueue; // (7)

auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8)
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9)

std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);

for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();

std::cout << "nn";

}



Yukarıdaki tartışmanın bir sonucu olarak Frank, Monitor için tutarlı ve kullanımı kolay bir arayüze sahip aşağıdaki sürümü önerdi.


// threadSafeQueue.cpp

#ifndef INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP

#include <atomic>
#include <algorithm>
#include <condition_variable>
#include <deque>
#include <iterator>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <vector>


class Monitor {
public:
struct UnlockAndNotify {
std::mutex d_mutex;
std::condition_variable d_condition;

void lock() { d_mutex.lock(); }
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};

private:
UnlockAndNotify d_combined;

public:
std::unique_lock<UnlockAndNotify> makeLockWithNotify() {
return std::unique_lock{d_combined};
}

template <typename PRED>
std::unique_lock<std::mutex> makeLockWithWait(PRED waitForCondition) {
std::unique_lock lock{d_combined.d_mutex};
d_combined.d_condition.wait(lock, waitForCondition);
return lock;
}
};

class ThreadQueue {
Monitor d_monitor;
std::deque<int> d_numberQueue;

auto makeLockWhenNotEmpty() {
return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
}

public:
void addNumber(int number) {
const auto lock = d_monitor.makeLockWithNotify();
d_numberQueue.push_back(number);
}

int removeNumber() {
const auto lock = makeLockWhenNotEmpty();
const auto number = d_numberQueue.front();
d_numberQueue.pop_front();
return number;
}
};

#endif

int main() {
ThreadQueue queue;
std::atomic<int> sharedSum{};
std::atomic<int> sharedCounter{};

std::vector<std::jthread> threads;
threads.reserve(200);
std::generate_n(std::back_inserter(threads), 100, [&] {
return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
});
std::generate_n(std::back_inserter(threads), 100, [&] {
return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
});

threads.clear(); // wait for all threads to finish
if (sharedSum.load() != 5050) {
throw std::logic_error("Wrong result for sum of 1..100");
}
}



Monitor nesnesinin uygulanması, esnekliğine dayanır. std::unique_lock şablon parametresi aracılığıyla. Tüm standart C++ blokları, kullanan herhangi bir sınıfla kullanılabilir. lock()– VE unlock()-Yöntemler. Sınıf UnlockAndNotify bu arayüzü uygulayın ve durum değişkenini dosyada ayarlayın unlock()-Ücretsiz yöntem. Ayrıca, sınıf sunar Monitor kullanarak biri bildirimli diğeri bildirimsiz olmak üzere iki farklı türde blok oluşturmak için kullanılabilecek sadeleştirilmiş bir genel arabirim sağlar. std::unique_lock ikisi de genel UnlockAndNotify-örnek veya yalnızca dahil std::mutex yaratılır.

arasında seçim yaparken std::unique_lock VE std::lock_guard Tercih ederim (Frank) unique_lock arayüzde. Bu seçim, monitör sınıfı kullanıcısına daha fazla esneklik sağlar. Bu esnekliğe olası bir performans farkından daha çok değer veriyorum lock_guardki bu hala ölçülmelidir. Verilen örneklerin bu ek esneklikten yararlanmadığını itiraf etmeliyim.

Daha sonra Dietmar, Frank’in fikrini daha da geliştirdi: Burada, korunan veriler monitörde saklanıyor, bu da korumasız erişimi zorlaştırıyor.


// threadsafequeue2.cpp

#ifndef INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <iterator>
#include <mutex>
#include <random>
#include <stdexcept>
#include <thread>
#include <tuple>
#include <vector>

namespace patterns::monitor3 {

template <typename T>
class Monitor {
public:
struct UnlockAndNotify {
std::mutex d_mutex;
std::condition_variable d_condition;

void lock() { d_mutex.lock(); }
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};

private:
mutable UnlockAndNotify d_combined;
mutable T d_data;

public:
std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const {
return { d_data, std::unique_lock{d_combined} };
}

template <typename PRED>
std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const {
std::unique_lock lock{d_combined.d_mutex};
d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
return { d_data, std::move(lock) };
}
};

template <typename T>
class ThreadQueue {
Monitor<std::deque<T>> d_monitor;

public:
void add(T number) {
auto[numberQueue, lock] = d_monitor.makeProducerLock();
numberQueue.push_back(number);
}

T remove() {
auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
const auto number = numberQueue.front();
numberQueue.pop_front();
return number;
}
};
}

#endif

class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};

int main(){

std::cout << 'n';

constexpr auto NumberThreads = 100;

patterns::monitor3::ThreadQueue<int> safeQueue;

auto addLambda = [&safeQueue](int val){ safeQueue.add(val);
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.remove(); };

std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);

for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();

std::cout << "nn";

}


tekrar teşekkürler Frank VE Diyetmar. Son makalemde iş parçacığı güvenli sırasını yanlış uygulamamla eşzamanlılığın çok talepkar olduğunu kanıtlamak istemedim. Özellikle can sıkıcı olan, muteksi bir bloğa koymamış olmamdır (hata 2). Bunu C++ kurslarımda öğretiyorum: NNM (Çıplak Mutex Yok).

Sıradaki ne?


Bir sonraki makalemde C++20’nin geleceğini inceleyeceğim: C++23.


(rm)



Haberin Sonu
 
Üst