Справочник функций

Ваш аккаунт

Войти через: 
Забыли пароль?
Регистрация
Информацию о новых материалах можно получать и без регистрации:

Последние темы форума

Показать новые сообщения »

Почтовая рассылка

Подписчиков: 11849
Последний выпуск: 19.06.2015

Синхронизация потоков в OC Linux

Автор: Павлов А.Ю.
www.intser.fatal.ru

Если вы хотя бы раз разрабатывали многопоточное приложение, не важно под какую ос - думаю вы сталкивались с понятием синхронизации. Т.е. я не собираюсь досконально разбирать что это такое и для чего нужно - есть масса руководств по этой теме :). Здесь будет рассматриваться один конкретный случай синхронизации потоков применительно к ОС Linux. И так приступим.

Для создания потока используется библиотека pthread и вызов pthread_create, для синхронизации в этой же библиотеке описаны специальные объекты - мутексы. Мутекс - это объект который может принадлежать в некий момент времени только одному потоку и имеющий два состояния - занят и свободен. Поток пытающийся получить доступ к мутексу в случае если последний занят будет остановлен системой до освобождения объекта. На этом собственно и основана синхронизация - перед использованием общего ресурса потоки сначала обращаются к мутексу и в конечном счете выстраиваются в очередь.

Все отлично работает - но рассмотрим ситуацию когда есть много потоков выполняющих только чтение некоего ресурса(они не нуждаются в синхронизации между собой) и один или несколько потоков выполняющих изменение этого ресурса. Т.е. синхронизировать нужно этот пишущий поток со всеми читателями. Если просто использовать мутекс для синхронизации доступа к ресурсу очевидно что и потоки чтения будут получать доступ туда последовательно, что в свою очередь замедлит приложение, фактически превратив его в однопоточное :). В этой ситуации одним из вариантов решения будет применение семафора. Семафор - это специальный объект ядра предназначенный для взаимодействия процессов в системе. Не буду утомлять описанием этого объекта, это прекрасно сделали до меня, например тут Linux Interprocess Communications. Упомяну только что в системе создается именованное множество семафоров(содержащее минимум 1 семафор), каждый семафор содержит некое количество ресурсов выраженное целым числом. Поток может запрашивать ресурсы у семафора и естественно должен отдавать их обратно когда они более не нужны. В случае если запрошенное количество ресурсов недоступно, но меньше максимального имеющегося количества - поток ожидает освобождения(в случае если не установлена опция - без ожидания)- иначе возвращается ошибка. Таким образом семафор более гибкий механизм синхронизации(однако неприятность в том что это объект ядра). Попробуем применить его к озвученной выше задаче.

Для облегчения работы рассмотрим объектный интерфейс реализующий множество семафоров состоящее из одного семафора: В примере 2 файла - можно безболезненно слить их в один, например sem.cpp :)

/*sem.h*/
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>

// semafore
#include <ctype.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/sem.h>

/*
В моей системе данное объединение не объявлено в подключенных файлах -
если компилятор напишет что оно ранее объявлено - просто сотрите :)
*/
union semun {
    int      val;    /* Value for SETVAL */
    struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
    unsigned short  *array;  /* Array for GETALL, SETALL */
    struct seminfo  *__buf;  /* Buffer for IPC_INFO
         (Linux specific) */
    };    


class sem
    {
    private:
        int sid;       // идентификатор семафора
        key_t key;     // ключ по которому получаем идентификатор
        int res_count; // количество ресурсов у семафора
    public:
        /*
        кол-во ресурсов
        некое случайное число
        путь в системе - обязательно должен существовать !
        */
        sem(int max_res, int id, const char* identify);
        /*
        деструктор - тут удаляем семафор, иначе он останется до
        следующей перезагрузки системы или пока его кто-то явно не удалит
        */
        ~sem();
        /*
        обертки для занятия/освобождения ресурсов
        */
        bool lock(int res);
        bool unlock(int res);    
    };
    
typedef sem* psem;
;

Cледующий файл:

/*sem.cpp*/
#include "sem.h"

sem::sem(int max_res, int id, const char* identify)
    {
    sid = -1;
    res_count = 0;
    /*
    получаем ключ для семафора
    */
    if ((key = ftok(identify,id)) == -1)
        {
        return;
        }
    
    /*
    0666 - rw для всех, чтобы потом можно было обратиться к семафору
    от любого пользователя системы, в общем то личное дело каждого
    сначала пытаемся открыть имеющийся семафор с таким ключом - и
    удалить его, старый нам ни к чему
    */
    if ((sid = semget(key, 0, 0666)) != -1)
        {
        if (semctl(sid, 0, IPC_RMID, 0) == -1)
            {
            sid = 0;
            }
        }
        
    if (sid != 0)    // проверяем что семафор был найден и удален или не существовал
        {
        /*
        создаем с флагом IPC_EXCL - означает что в случае если семафор уже
        имеется - вызов будет провален c значением EEXIST,
        без возврата значения отрытого уже существующего семафора
        */
        if ((sid = semget( key, 1, IPC_CREAT | IPC_EXCL | 0666 )) != -1)    // 
            {
            union semun semopts;
                semopts.val = max_res;
              semctl(sid, 0, SETVAL, semopts);
              res_count = max_res;
            }
          } else sid = -1;              
    }

sem::~sem()
    {
    if (sid != -1)
        {
        semctl(sid, 0, IPC_RMID, 0);
        }
    }
    
bool sem::lock(int res)
    {    
    /*
    отсекаем неверные запросы сразу, не используя обращения к структурам семафора
    */
    if ((res > res_count) || (sid == -1))
        {
        return false;
        }
    /*
    параметры в структуре
    0 - номер семафора
    количество ресурсов - если захватываем, должно быть отрицательным
    0 - ждать если на данный момент нет достаточного
        количества ресурсов или IPC_NOWAIT - возвращать ошибку
    */
    struct sembuf sem_lock={0,(-1)*res,0};
    /*
    параметры запроса
    - идентификатор семафора
    - структура которую заполняли выше - ее адрес
    - сколько раз выполнить операцию
    */
    if ((semop(sid, &sem_lock, 1)) == -1)
          {
          return false;
          }
      return true;
    }
    
bool sem::unlock(int res)
    {
    /*
    аналогично функции lock
    */
    if ((res > res_count) || (sid == -1))
        {
        return false;
        }
    struct sembuf sem_unlock= { 0, res, IPC_NOWAIT};
      if ((semop(sid, &sem_unlock, 1)) == -1)
          {
          return false;
         }
    return true;
    }

Создадим семафор с 10 ресурсами например, читающие потоки будут запрашивать по 1 ресурсу за раз, пишущий сразу 10. В итоге читающие потоки будут блокировать друг друга только в случае если их запущенное число превысит 10(нужно самостоятельно подбирать нужное значение). Поток записи же при старте будет гарантированно ждать освобождения ресурса всеми потоками чтения и заставлять ждать позднее стартующие потоки окончания своей работы. Хочу обратить внимание на один нюанс - перед запросом получения ресурса потоком чтения следует ставить запрос на разрешение обратиться за получением этого самого ресурса. Т.е. поток чтения в случае если не установлен некий флаг разрешения зацикливается и ждет пока его установят, только после этого приступая к запросу. Это нужно для того чтобы потоки чтения не оттесняли поток записи в конец цикла ожидания - например освобождено уже 9 ресурсов из 10, поток записи ждет, но запускается поток чтения, запрашивает ресурс и естественное его получает - в итоге 8 свободных ресурсов :), запись оттесняется. Кстати очень удобно если требуется выполнение какой-либо работы во время простоя системы :).

Рассмотрим пример программы:

/* stest.cpp */

#include <sys/types.h>
#include <unistd.h>
#include <errno.h>

// threads
#include <pthread.h>
// semaphore
#include "sem.h"


#define max_rs 10

using namespace std;

/*
Флаг остановки доступа к ресурсам
*/
bool stop_access = false;

// процедура потока чтения
void* read_proc(void* arg)
    {
    if (arg == NULL)
    {
    cerr << "Argument empty \n";
    return NULL;
    }
    int pid = getpid();
    // ждем разрешения
    while(stop_access)
    {}
    // получаем ресурс
    if (((psem)(arg))->lock(1))
    {
        // типа что-то делаем :)
    cout << "reader lock " << pid << endl;
    sleep(1);
    cout << " unlock " << pid << endl;
    if (((psem)(arg))->unlock(1))
     {
     }
    }

    return NULL;
    }
// процедура потока записи
void* write_proc(void* arg)
    {
    if (arg == NULL)
    {
    cerr << "Argument empty \n";
    return NULL;
    }
    int pid = getpid();
    // запрещаем обращение к ресурсам
    stop_access = true;
    if (((psem)(arg))->lock(10))
    {
    stop_access = false;
    cout << "writer lock " << pid << endl;
    sleep(4);
    cout << " unlock " << pid << endl;
    if (((psem)(arg))->unlock(10))
     {

     }
    }
    return;
    }

main(int argc,char* argv[])
    {
        sem* sm = new sem(10,getpid(),".");
    // инициализируем потоки
        pthread_attr_t attr;
        pthread_attr_init(&attr);
    // отсоединенный поток - не ждем его возврата
    pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
    int ret;
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE , &ret);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ret);
    int i = 0;
    pthread_t thread_handle;
    // последовательный запуск 8-ми потоков
    for (i = 0;  i < 8; i++)
    {
    if (pthread_create(&thread_handle,NULL,&read_proc,sm) != 0)
     {
     cout << strerror(errno) << endl;
     }    
        // запускаем поток записи на 3-е итерации 
    if (i == 2)
     {
     if (pthread_create(&thread_handle,NULL,&write_proc,sm) != 0)
     {
     cout << strerror(errno) << endl;
     }
     }
    }
    // задержка выхода - чтобы увидеть результат
    sleep(10);
    sm->~sem();
    return 0;
    }

Собирается эта программа очень просто: g++ -g stest.cpp sem.cpp -lpthread -o stest

Вывод примерно такой:

reader lock 31718
reader lock 31719
reader lock 31720
 unlock 31718
 unlock 31719
 unlock 31720
writer lock 31721
 unlock 31721
reader lock 31726
reader lock 31725
reader lock 31723
reader lock 31724
reader lock 31722
 unlock  unlock 31726 unlock 31723
 unlock 31724
 unlock 31722
31725

Заметно что запустившийся поток записи заблокировал ресурс, отработал и только после этого дал зеленый свет остальным потокам. Потоки чтения не блокируют друг друга - об этом можно судить по порядку вывода записей освобождения ресурсов - происходит не в порядке захвата.

Собственно на этом заканчивается мое повествование, для более подробной информации по семафорам можно пройти по ссылке которая упоминалась выше. Надеюсь этот материал поможет вам в решении ваших задач.

Оставить комментарий

Комментарий:
можно использовать BB-коды
Максимальная длина комментария - 4000 символов.
 

Комментарии

1.
442
30 января 2007 года
Jail
550 / / 30.01.2007
+1 / -0
Мне нравитсяМне не нравится
24 марта 2007, 18:35:24
Неплохая статья,но слишком зжато. В сети нашёл описание гораздо более стоящее и обстоятельное. Но всё равно спасибо. Для начинающего прогера линух(например), эта статейка будет не совсем понятна)))))
Реклама на сайте | Обмен ссылками | Ссылки | Экспорт (RSS) | Контакты
Добавить статью | Добавить исходник | Добавить хостинг-провайдера | Добавить сайт в каталог