[LinuxFocus-icon]
<--  | Домой  | Карта  | Индекс  | Поиск

Новости | Архивы | Ссылки | Про LF
эта страница доступна на следующих языках: English  Castellano  Deutsch  Francais  Nederlands  Russian  Turkce  

[Leonardo]
автор Leonardo Giordani
<leo.giordani(at)libero.it>

Об авторе:

Студент Политехнического Университета Милана, учится на факультете телекоммуникационных технологий, работает сетевым администратором и интересуется программированием (в основном на Ассемблере и C/C++). С 1999 практически постоянно работает в Linux/Unix



Перевод на Русский:
Kirill Pukhlyakov <kirill(at)linuxfocus.org>

Содержание:

 

Параллельное программирование - очереди сообщений (1)

[run in paralell]

Резюме:

Цель этой серии заметок - знакомство читателя с идеей многозадачности и ее реализация в ОС Linux. Начав эту серию с теоретических основ многозадачности - мы закончим написанием приложения, демонстрирующего взаимодействие между процессами посредством простого, но эффективного протокола.

С чем должны быть знакомы читатели для понимания заметки:

  • минимальные знания shell'а
  • основы языка 'C' (синтаксис, циклы, библиотеки)

Также неплохо прочитать предыдущие заметки из этой серии на сайте LinuxFocus ( Ноябрь2002 и Январь2003 ).
_________________ _________________ _________________

 

Вступление

В прошлых заметках мы начали изучать параллельное программирование, в частности узнали как осуществить межпроцессорное взаимодействие с помощью семафоров. Используя их мы можем управлять доступом к разделяемым ресурсам, что позволяет нам синхронизировать работу двух и более процессов.

Задача синхронизации - выделение процессам времени на работу, но не абсолютного времени сиситемы ( когда назначается точное время начала работы ), а относительное, когда мы можем выделить время для какого-либо процесса в первую очередь, а для другого - во вторую и т.д.

Использование семафоров для этого достаточно сложное и к тому же ограниченное решение - сложное потому, что каждый процесс должен управлять семафором для другого, с которым необходимо синхронизироваться, а ограниченное, потому, что это не позволяет нам осуществлять обмен параметрами между процессами. Представьте ситуацию создания нового процесса - событие создания его должно быть послано каждому выполняющемуся процессу - семафоры не позволяют сделать этого.

Кроме того, параллельный контроль доступа к разделяемым ресурсам посредством семафоров может привести к длительной блокировке процесса, когда другой процесс освободит и вновь заблокирует ресурс до того как какой-н другой процесс возьмет контроль над ним. Как мы уже узнали, в мире параллельного программирования невозможно узнать заранее какой и когда процесс будет запущен.

Эти короткие размышления позволяют нам понять, что семафоры - неподходящий инструмент для решения сложных проблем синхронизации. Элегантное решение - использование очередей сообщений - в этой заметке мы рассмотрим теорию взаимодействия процессов и напишем небольшую программу, используя средства SysV.

 

Теория очереди сообщений

Каждый процесс способен создать любое количество структур называемых очередями: каждая структура может содержать любое количество сообщений разных типов, которые имеют разную природу и содержат любую информацию; каждый процесс способен послать сообщение в очередь, зная ее идентификатор. Также процесс способен получить доступ к очереди и прочитать сообщения в хронологическом порядке ( начиная с самого первого, последнего, самого недавнего и последнего, поступившего в очередь ), но выборочно, т.е. сообщения только определенного типа, что обеспечивает контроль за возможностью прочитать эти сообщения.

Использование очереди легко понять представив ее в виде почтовой системы между процессами: каждый процесс обладает адресом для взаимодействия с другими процессами. Процесс читает сообщения, предназначенные ему и дальнейшая его работа зависит от этих сообщений.

Итак, синхронизация двух процессов достигается использованием сообщений: ресурсы по-прежнему будут обладать семафорами, чтобы процессы знали их статус, а разделение времени работы происходит при помощи сообщений. Теперь вы понимаете, что использование сообщений не такая большая сложность как это могло показаться сначала.

Перед тем как мы узнаем как использовать механизм сообщений в языке 'C' необходимо рассмотреть еще один аспект синхронизации - коммуникационный протокол.

 

Создание протокола

Протоколом называется набор правил, с помощью которого происходит взаимодействие объектов; в прошлой заметке мы рассмотрели создание одного из самых простых протоколов на основе семафоров, который заставляет процессы работать в соответствии с их статусом. Использование очередей сообщений позволяет нам создавать более сложные протоколы; важно понять, что все сетевые протоколы (TCP/IP, DNS, SMTP, ...) построены на архитектуре обмена сообщениями. Все достаточно просто - нет никакой разницы на одном ли компьютере происходит взаимодействие процессов или между разными. Как мы увидим в следующей заметке, рассматривая распределенную работу ( несколько компьютеров во взаимодействии ) - это все очень просто.

Рассмотрим простой протокол, основанный на обмене сообщениями: два процесса ( А и В ) выполняются параллельно и работают с разными данными: после окончания работы каждого им необходимо обменяться данными. Простой протокол их взаимодействия выглядит следующим образом:

ПРОЦЕСС B:


ПРОЦЕСС A:

Выбор процесса ответственного за объединение данных достаточно условен в нашем примере; в реальной жизни это зависит от природы взаимодействующих процессов ( клиент/сервер ), но это тема отдельной заметки.

Рассмотренный протокол легко применим к n процессам. Любой процесс, кроме А, обрабатывает свои данные и затем посылает сообщение процессу А. После ответа процесса А ему пересылаются данные, нет необходимости менять структуру процессов, кроме А.

 

Очереди сообщений в System V

Теперь рассмотрим реализацию этого механизмав ОС Linux. Как было сказано ранее нам доступен набор примитивов для работы со структурами, представляющими основу механизма сообщений и эта работа схожа с управлением семафорами: я надеюсь, что читатель знаком с основами создания процессов, использования системных вызывов и IPC ключами.

Структура для описания сообщения называется msgbuf ;и объявлена в linux/msg.h

/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
        long mtype;         /* type of message */
        char mtext[1];      /* message text */
};

Поле mtype описывает тип сообщения и всегда имеет положительное значение: соответствие между типами сообщений и их числовым представлением необходимо определить заранее - это часть протокола. Второе поле представляет само сообщение. Структура msgbuf может быть переопределена и содержать более сложные данные; например мы можем определить ее так:
struct message {
        long mtype;         /* message type */
        long sender;        /* sender id */
        long receiver;      /* receiver id */
        struct info data;   /* message content */
        ...
};

Прототип сообщения может иметь максимальный размер до 4056 байт. Естественно, мы можем перекомпилировать ядро и увеличить это значение, но, тогда наше приложение станет непереносимым ( кроме того, этот размер был точно вычислен и его большое увеличение не является хорошей идеей ).

Для создания новой очереди процесс использует функцию msgget()

int msgget(key_t key, int msgflg)
в которую необходимо передать аргументы и IPC ключ, который можно установить в
IPC_CREAT | 0660
( создать очередь, если она еще не существует и предоставить доступ владельцу и группе 'users' ). Эта функция возвращает идентификатор очереди.

Как и в предыдущих заметках, мы подразумеваем, что ошибки не возникли, для упрощения нашего кода и в будущей заметке поступим также, несмотря на то, что будем говорить о безопасном IPC коде.

Чтобы послать сообщение в очередь, зная ее идентификатор, необходимо использовать функцию msgsnd()

int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg)

где msqid идентификатор очереди, msgp указатель на сообщение, которое мы хотим послать ( тип которого определен как struct msgbuf но который мы переопределили ), msgsz размер сообщения ( исключая длину mtype типа long, т.е. 4 байта ) и msgflg флаг правила ожидания. Размер сообщения достаточно просто получить, используя следующее выражение:
length = sizeof(struct message) - sizeof(long);

что касается правила ожидания в случае полной очереди: если msgflg установлен в IPC_NOWAIT посылающий сигнал процесс не будет ждать освобождения очереди и выйдет с кодом ошибки: подробнее мы поговорим об этом позднее, когда будем рассматривать вопрос обработки ошибок.

Чтобы прочитать сообщения, находящиеся в очереди необходимо использовать системную функцию msgrcv()

int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long mtype, int msgflg)


где msgp указатель на буфер, в котором мы будем читать сообщения, а mtype определяет список интересующих нас сообщений.

Удаление очереди осуществляется вызовом функции msgctl() с флагом IPC_RMID

msgctl(qid, IPC_RMID, 0)

Теперь посмотрим на примере все о чем говорили выше: мы создадим очередь сообщений, пошлем в нее сообщение и прочитаем его, таким образом проконтролировав корректную работу системы.
#include <stdio.h>
#include <stdlib.h>
#include <linux/ipc.h>
#include <linux/msg.h>

/* Redefines the struct msgbuf */
typedef struct mymsgbuf
{
  long mtype;
  int int_num;
  float float_num;
  char ch;
} mess_t;

int main()
{
  int qid;
  key_t msgkey;

  mess_t sent;
  mess_t received;

  int length;

  /* Initializes the seed of the pseudo-random number generator */
  srand (time (0));

  /* Length of the message */
  length = sizeof(mess_t) - sizeof(long);

  msgkey = ftok(".",'m');

  /* Creates the queue*/
  qid = msgget(msgkey, IPC_CREAT | 0660);

  printf("QID = %d\n", qid);

  /* Builds a message */
  sent.mtype = 1;
  sent.int_num = rand();
  sent.float_num = (float)(rand())/3;
  sent.ch = 'f';

  /* Sends the message */
  msgsnd(qid, &sent, length, 0);
  printf("MESSAGE SENT...\n");

  /* Receives the message */
  msgrcv(qid, &received, length, sent.mtype, 0);
  printf("MESSAGE RECEIVED...\n");

  /* Controls that received and sent messages are equal */
  printf("Integer number = %d (sent %d) -- ", received.int_num,
         sent.int_num);
  if(received.int_num == sent.int_num) printf(" OK\n");
  else printf("ERROR\n");

  printf("Float numero = %f (sent %f) -- ", received.float_num,
         sent.float_num);
  if(received.float_num == sent.float_num) printf(" OK\n");
  else printf("ERROR\n");

  printf("Char = %c (sent %c) -- ", received.ch, sent.ch);
  if(received.ch == sent.ch) printf(" OK\n");
  else printf("ERROR\n");

  /* Destroys the queue */
  msgctl(qid, IPC_RMID, 0);
}

Теперь еще один пример - создадим два процесса и заставим их взаимодействовать через очередь сообщений; остановимся немного на концепции порождения процессов: значения всех переменных родительского процесса переходят к порожденному ( memory copy ). Это значит, что нам необходимо создать очередь до порождения, чтобы и процесс-родитель и процесс-потомок могли знать идентификатор очереди и обращаться к ней.

Что происходит в этой небольшой программе: посредством очереди попрожденный процесс пересылает данные процессу потомку - порожденный процесс генерирует случайные числа и пересылает их процессу родителю и они оба выводят их в стандартный поток вывода.

#include <stdio.h>
#include <stdlib.h>
#include <linux/ipc.h>
#include <linux/msg.h>
#include <sys/types.h>

/* Redefines the message structure */
typedef struct mymsgbuf
{
  long mtype;
  int num;
} mess_t;

int main()
{
  int qid;
  key_t msgkey;
  pid_t pid;

  mess_t buf;

  int length;
  int cont;

  length = sizeof(mess_t) - sizeof(long);

  msgkey = ftok(".",'m');

  qid = msgget(msgkey, IPC_CREAT | 0660);

  if(!(pid = fork())){
    printf("SON - QID = %d\n", qid);

    srand (time (0));

    for(cont = 0; cont < 10; cont++){
      sleep (rand()%4);
      buf.mtype = 1;
      buf.num = rand()%100;
      msgsnd(qid, &buf, length, 0);
      printf("SON - MESSAGE NUMBER %d: %d\n", cont+1, buf.num);
    }

    return 0;
  }

  printf("FATHER - QID = %d\n", qid);

  for(cont = 0; cont < 10; cont++){
    sleep (rand()%4);
    msgrcv(qid, &buf, length, 1, 0);
    printf("FATHER - MESSAGE NUMBER %d: %d\n", cont+1, buf.num);
  }

  msgctl(qid, IPC_RMID, 0);

  return 0;
}

Мы создали два процесса, взаимодействующих через систему обмена сообщениями. В данном случае нет необходимости в протоколе - взаимодействие очень простое: в следующей заметке мы поговорим снова об очередях сообщений и об управлении различными типами сообщений. Также мы рассмотрим протокол взаимодействия, чтобы начать строить наш большой IPC проект.  

Рекомендуемая литература

 

Страница отзывов

У каждой заметки есть страница отзывов. На этой странице вы можете оставить свой комментарий или просмотреть комментарии других читателей
 talkback page 

<--, перейти к начальной странице выпуска

Webpages maintained by the LinuxFocus Editor team
© Leonardo Giordani, FDL
LinuxFocus.org
Translation information:
it --> -- : Leonardo Giordani <leo.giordani(at)libero.it>
it --> en: Leonardo Giordani <leo.giordani(at)libero.it>
en --> ru: Kirill Pukhlyakov <kirill(at)linuxfocus.org>

2003-09-25, generated by lfparser version 2.43