c++ Linux simple io completion ports in user mode

تم تحميل الصفحة في 1,2371841 ثانية
c++ Linux simple io completion ports in user mode
إنضم
27 يناير 2018
المشاركات
578
الإعجابات
522
النقاط
93
مرحبا
هذا كود بسيط بدأت العمل عليه من يوم لتسهيل كتابة سيرفرات في الويندوز واللينكس معا

المشكلة أن الويندوز يوفر طريقة io completion ports وهي طريقة جيدة وتسهل كتابة السيرفرات وعمليات الكتابة والقراءة

أما اللينكس ومشتقاته مثل أندرويد فلا يدعم async io يعني لا يمكنك بدء طلب كتابة أو قراءة ثم انتظار النتيجة لكنه يدعم non blocking mode وفيه poll و epoll وهي أصعب وأكثر تقيدا من طريقة الويندوز

لكن عند شراء vps تكون أسعار اللينكس أرخص غالبا لأنه مجاني فلا يمكن استخدام برامج الويندوز إلا مع wine وهي ثقيلة علي السيرفر لأن الإمكانيات حسب الدفع

لذلك قمت بكتابة هذا الكود لتسهيل كتابة سيرفر علي كلا النظامين بدون أكواد مختلفة
والكود لا زال في مرحلة مبكرة علي الاستخدام في السيرفرات بشكل مستقر

أيضا تنسيق الكود ليس جيدا ومكتوب كله في ملف واحد لأني أكتب من علي الهاتف ببرنامج cxxdroid
وجربته فقط علي الأندرويد علي ملفات الهارد ديسك ولم أجربه علي السوكت بعد

مع ملاحظة أنه مصمم خصيصا للمقابس التي تعمل في وضع non blocking أي أنه غير مفيد جدا للملفات


C++:
#include <queue>
#include <condition_variable>
#include <unistd.h>
#include <sys/epoll.h>
#include <thread>
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

using namespace std;

class SafePrinter
{
public :
    
    SafePrinter& operator<<(ostream& (*pf) (ostream&))
    {
        std::cout << pf;
        return *this;
    }
    
    template<class T>
    SafePrinter& operator<<(T p)
    {
        std::lock_guard<std::mutex> lock(mtx);
        std::cout << p;
        return *this;
    }
private :
    std::mutex mtx;
};

static SafePrinter printer;

template <class T>
class SafeQueue
{
public:
    
    SafeQueue()
    {}
    void Push(const T& val);
    void Push(T&& val);
    void Pop(T& val);
    bool Pop(T& val, unsigned long long timeout);
    T Pop();
    void Clear();
    bool IsEmpty();
    size_t Size();
private :
    std::queue<T> q;
    std::mutex mtx;
    std::condition_variable cv;   
};

template<class T>
inline void SafeQueue<T>::Push(const T& val)
{
    mtx.lock();
    q.emplace(val);
    mtx.unlock();
    cv.notify_one();
}

template<class T>
inline void SafeQueue<T>::Push(T&& val)
{
    mtx.lock();
    q.emplace(std::move(val));
    mtx.unlock();
    cv.notify_one();
}

template<class T>
inline void SafeQueue<T>::Pop(T& val)
{
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, [this] {return !q.empty();});
    val = q.front();
    q.pop();
}

template<class T>
inline bool SafeQueue<T>::Pop(T& val, unsigned long long timeout)
{
    std::unique_lock<std::mutex> lock(mtx);
    bool result = cv.wait_for(lock, std::chrono::milliseconds(timeout), [this] {return !q.empty();});
    if (result)
    {
        val = q.front();
        q.pop();
    }
    return result;
}

template <class T>
inline T SafeQueue<T>::Pop()
{
    T val;
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, [this] {return !q.empty();});
    val = q.front();
    q.pop();
    return val;
}

template <class T>
inline void SafeQueue<T>::Clear()
{
    std::lock_guard<std::mutex> lock(mtx);
    while(!q.empty())
        q.pop();
}

template <class T>
inline bool SafeQueue<T>::IsEmpty()
{
    std::lock_guard<std::mutex> lock(mtx);
    return q.empty();
}

template <class T>
inline size_t SafeQueue<T>::Size()
{
    std::unique_lock<std::mutex> lock(mtx);
    return q.size();
}

#define Io_Chunk_Size size_t(1024)

enum IoOpCode
{
    IoWriteCode,
    IoReadCode,
    IoSpecialCode,
};

enum class IoStatusCode
{
    IoStatusOk,
    IoStatusPending,
    IoStatusFailed,
};

struct IoContext
{
    IoStatusCode code;
    unsigned int bytes;
    void *buffer;
    unsigned int key;
};

struct IoOperationStruct
{
    IoOperationStruct()
    {}
    IoOperationStruct(const epoll_event& ev);
    std::shared_ptr<SafeQueue<IoContext*>> rport;
    int fd;
    char *buffer;
    IoOpCode code;
    IoContext *context;
    size_t rest;
    size_t offset;
};

static int io_epoll_instance;

static SafeQueue<IoOperationStruct*> io_requests;

static bool IoStopFlag;

void IoMainThread()
{
    epoll_event evs[50];
    while(!IoStopFlag)
    {
        int num = epoll_wait(io_epoll_instance, evs, 50, 10000);
        if (num < 0)
            return;
        for (int i = 0; i < num; ++i)
        {
            io_requests.Push((IoOperationStruct*)evs[i].data.ptr);
        }
    }
}

bool AddIoToQueue(IoOperationStruct *IoStruct, bool modify = false)
{
    epoll_event ev;
    ev.events = EPOLLONESHOT;
    switch(IoStruct->code)
    {
        case IoWriteCode :
            ev.events |= EPOLLOUT;
            break;
        case IoReadCode :
            ev.events |= EPOLLIN;
            break;
        default:
            break;
    }
    ev.data.ptr = IoStruct;
    bool res = epoll_ctl(io_epoll_instance, modify ? EPOLL_CTL_MOD : EPOLL_CTL_ADD,IoStruct->fd,  &ev) != -1;
    if (!res)
    {
        if (IoStruct->code == IoOpCode::IoSpecialCode || errno == EPERM)
        {
            res = true;
            io_requests.Push(IoStruct);
        }
    }
    return res;
}

struct HANDLE
{
    shared_ptr<SafeQueue<IoContext*>> q;
    int fd;
    unsigned int key;
};

HANDLE CreateIoCompletionPort()
{
    HANDLE port;
    port.q = make_shared<SafeQueue<IoContext*>>();
    port.fd = 0;
    return port;
}

void CloseHandle(HANDLE Handle)
{
    Handle.q.reset();
    if (Handle.fd)
        close(Handle.fd);
    Handle.fd = 0;
}

HANDLE BindToPort(HANDLE port, int fd, unsigned int key)
{
    HANDLE handle;
    handle.fd = fd;
    handle.q = port.q;
    handle.key = key;
    return handle;
}

bool GetQueuedCompletionStatus(HANDLE port, unsigned int& num_of_bytes, IoContext*& ctx, unsigned int& key, unsigned long long wait_time)
{
    if(!port.q->Pop(ctx, wait_time))
        return false;
    num_of_bytes = ctx->bytes;
    key = ctx->key;
    if (num_of_bytes)
        ctx->code = IoStatusCode::IoStatusOk;
    else
        ctx->code = IoStatusCode::IoStatusFailed;
    return true;
}

bool PostQueuedCompletionStatus(HANDLE port, unsigned int num_of_bytes, unsigned int key, IoContext& ctx)
{
    IoOperationStruct *IoStruct = new IoOperationStruct();
    IoStruct->buffer = nullptr;
    IoStruct->code = IoOpCode::IoSpecialCode;
    IoStruct->context = &ctx;
    IoStruct->fd = -1;
    IoStruct->offset = 0;
    IoStruct->rest = num_of_bytes;
    IoStruct->rport = port.q;
    IoStruct->context->buffer = nullptr;
    IoStruct->context->bytes = num_of_bytes;
    IoStruct->context->key = key;
    return AddIoToQueue(IoStruct);
}

bool WriteFile(HANDLE Handle, const void *data, size_t size, IoContext * Ctx)
{
    if (!data || !size || !Ctx)
        return false;
    IoOperationStruct *IoStruct = new IoOperationStruct();
    IoStruct->fd = Handle.fd;
    IoStruct->rport = Handle.q;
    IoStruct->buffer = (char*)data;
    IoStruct->context = Ctx;
    IoStruct->offset = 0;
    IoStruct->rest = size;
    IoStruct->code = IoOpCode::IoWriteCode;
    IoStruct->context->bytes = 0;
    IoStruct->context->code = IoStatusCode::IoStatusPending;
    return AddIoToQueue(IoStruct);
}

bool ReadFile(HANDLE Handle, void *data, size_t size, IoContext *Ctx)
{
    if (!data || !size || !Ctx)
        return false;
    IoOperationStruct *IoStruct = new IoOperationStruct();
    IoStruct->fd = Handle.fd;
    IoStruct->rport = Handle.q;
    IoStruct->buffer = (char*)data;
    IoStruct->context = Ctx;
    IoStruct->offset = 0;
    IoStruct->rest = size;
    IoStruct->code = IoOpCode::IoReadCode;
    IoStruct->context->bytes = 0;
    IoStruct->context->code = IoStatusCode::IoStatusPending;
    return AddIoToQueue(IoStruct);
}

bool IoHandleWrite(IoOperationStruct & IoStruct)
{
    if (!IoStruct.rest)
        return false;
    size_t to_write = std::min(IoStruct.rest, Io_Chunk_Size);
    int written = write(IoStruct.fd, IoStruct.buffer + IoStruct.offset, to_write);
    if (written < 0)
    {
        if (errno == EWOULDBLOCK || errno == EAGAIN)
            return true;
        return false;
    }
    IoStruct.offset += written;
    IoStruct.rest -= written;
    IoStruct.context->bytes += written;
    if (!IoStruct.rest)
        return false;
    return true;   
}

bool IoHandleRead(IoOperationStruct& IoStruct)
{
    if (!IoStruct.rest)
        return false;
    size_t to_read = std::min(IoStruct.rest, Io_Chunk_Size);
    int was_read = read(IoStruct.fd, IoStruct.buffer + IoStruct.offset, to_read);
    if (was_read < 0)
    {
        if (errno == EWOULDBLOCK || errno == EAGAIN)
            return true;
        return false;
    }
    IoStruct.offset += was_read;
    IoStruct.context->bytes += was_read;
    IoStruct.rest -= was_read;
    if (!IoStruct.rest)
        return false;
    return true;
}

void IoHandleResult(bool result, IoOperationStruct *IoStruct)
{
    if (!result)
    {
        shared_ptr<SafeQueue<IoContext*>> rport = IoStruct->rport;
        IoContext *Ctx = IoStruct->context;
        delete IoStruct;
        rport->Push(Ctx);
    }
    else
    {
        if (!AddIoToQueue(IoStruct, true))
            return IoHandleResult(false, IoStruct);
    }   
}

void IoHandlerThread()
{
    IoOperationStruct *IoStruct;
    while(!IoStopFlag)
    {
        io_requests.Pop(IoStruct);
        printer << "performing an io from " << this_thread::get_id() << endl;
        if (IoStruct->code == IoOpCode::IoWriteCode)
        {
            bool result = IoHandleWrite(*IoStruct);
            IoHandleResult(result, IoStruct);       
        }
        else if (IoStruct->code == IoOpCode::IoReadCode)
        {
            bool result = IoHandleRead(*IoStruct);
            IoHandleResult(result, IoStruct);
        }
        else if (IoStruct->code == IoOpCode::IoSpecialCode)
        {
            IoHandleResult(false, IoStruct);
        }
    }
}

void start_io_ports()
{
    io_epoll_instance = epoll_create(10);
    if (io_epoll_instance < 0)
        cout << "failed to open epoll with error : " << strerror(errno) << endl;
    IoStopFlag = false;
    std::thread(IoMainThread).detach();
    for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
    {
        std::thread(IoHandlerThread).detach();
    }
}

HANDLE port;

std::condition_variable cv;
std::mutex mtx;

void worker_thread()
{
    printer << "id : " << this_thread::get_id() << endl;
    while (1)
    {
        unsigned int bytes;
        unsigned int key;
        IoContext *ctx;
        if(!GetQueuedCompletionStatus(port, bytes, ctx, key, 1000))
        {
            printer << "timeout !" << endl;
            continue;
        }
        if (!key && !bytes)
        {
            printer << "the worker is asked to exit !" << endl;
            delete ctx;
            return;
        }
        printer << " ======== " << endl;
        printer << "io was performed " << endl;
        printer << "bytes transferred : " << bytes << endl;
        printer << " ========= " << endl;
        delete (char*)ctx->buffer;
        delete ctx;
    }
}

int main()
{
    start_io_ports();
    port = CreateIoCompletionPort();
    int fd = open("/sdcard/test.txt", O_WRONLY | O_CREAT);
    if (fd < 0)
    {
        printer << "error : " << strerror(errno) << endl;
        return 0;
    }
    HANDLE file = BindToPort(port, fd, 5);
    std::thread(worker_thread).detach();
    int times = 10;
    while (times--)
    {
        printer << "submitting a job" << endl;
        char *buff = new char[10010];
        IoContext *ctx = new IoContext();
        ctx->buffer = buff;
        size_t to_write = 0;
        for (size_t i = 0; i < 10000; i += 9)
        {
            memcpy(buff + i, "ttttttttt", 9);
            to_write += 9;
        }
        if(!WriteFile(file, buff,to_write,  ctx))
            cout << "failed !" << endl;
        sleep(2);
    }
    IoContext *ctx = new IoContext();
    PostQueuedCompletionStatus(port, 0, 0, *ctx);
    CloseHandle(file);
    CloseHandle(port);
    sleep(10000000000000000);
}
 

الأعضاء النشطين حاليآ الذين يشاهدون هذا الموضوع (1 عضو و 0 ضيف)

خيارات الاستايل

نوع الخط
مودك
اخفاء السايدر بار OFF
توسيط المنتدى OFF
فصل الأقسام OFF
الأقسام الفرعية OFF
عرض المشاركات
حجم الخط
معلومات العضو OFF
إخفاء التوقيع OFF

إرجاع خيارات الإستايل