189 8069 5689

Qt服务端多线程的示例分析

这篇文章给大家分享的是有关Qt服务端多线程的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

创新互联于2013年开始,先为武川等服务建站,武川等地企业,进行企业商务咨询服务。为武川企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。

 该例子仅使用两个线程, 一个线程负责监听新的连接,一个线程用来处理已经建立连接的客户端事件(此处可以用一个线程池来提高性能)。消息接收加入了一一个简单分包机制,每条消息的前四个字节存储的是 uint32_t 类型,指该条消息整个长度, 这样就可以很好区分出每个消息。该代码在许多细节上有些不严谨的地方,仅供从参考

// tcpserver.h

class CClientSocket;

class CTcpServer final : public QTcpServer
{
    Q_OBJECT
public:
    explicit CTcpServer(QObject *parent = nullptr);
    virtual ~CTcpServer() override;

    void Listen(int _iPort);
    void Nortify(const QByteArray &_csMessage);

protected:
    virtual void incomingConnection(qintptr socketDescriptor) override;
    virtual void timerEvent(QTimerEvent *event) override;

public slots:
    void SLOT_ClientDisconnect();

private:
    void _PackageMessage(QByteArray &_baMsg);

private:
    QList> m_lstSocket;     ///< 连接的客户端
    QThread * m_pEventThd;  ///< 事件线程
};
// tcpserver.cpp
CTcpServer::CTcpServer(QObject *parent) : QTcpServer(parent),
    m_pEventThd(new QThread())
{
    m_lstSocket.clear();
}

CTcpServer::~CTcpServer()
{

}

void CTcpServer::Listen(int _iPort)
{
    this->listen(QHostAddress::Any, static_cast(_iPort));
    m_pEventThd->start();
    QObject::startTimer(5 * 1000);
}

void CTcpServer::Nortify(const QByteArray &_csMessage)
{
    QByteArray baSendMsg = _csMessage;
    _PackageMessage(baSendMsg);
    for (auto pClientSocket : m_lstSocket)
    {
        pClientSocket->SendMsg(baSendMsg);
    }

    QThread::msleep(50);
}

void CTcpServer::incomingConnection(qintptr socketDescriptor)
{
    qDebug() << "#################MainThread:" << QThread::currentThread() << m_pEventThd;
    std::shared_ptr pClient = std::make_shared(socketDescriptor, nullptr);
    connect(pClient.get(), &CClientSocket::SIGNAL_Disconneted, this, &CTcpServer::SLOT_ClientDisconnect);
    pClient->InitSocket(m_pEventThd);
    m_lstSocket.push_back(pClient);
    emit newConnection();
}

void CTcpServer::timerEvent(QTimerEvent *event)
{
    this->Nortify("hello world");
}

void CTcpServer::SLOT_ClientDisconnect()
{
    CClientSocket *pClient = dynamic_cast(QObject::sender());
    if (pClient)
    {
        for (const auto &index : m_lstSocket)
        {
            if (index.get() == pClient)
            {
                m_lstSocket.removeOne(index);
                return;
            }
        }
    }
}

void CTcpServer::_PackageMessage(QByteArray &_baMsg)
{
    uint32_t iSize = static_cast(_baMsg.size());
    iSize = ::ntohl(iSize);
    _baMsg.prepend(reinterpret_cast(&iSize), sizeof (iSize));
}
// clientsocket
class CClientSocket : public QTcpSocket
{
    Q_OBJECT
private:
    struct TMsgCache
    {
        void Clear()
        {
            iSize = 0;
            baPacket = "";
        }

        size_t iSize = 0;  ///< 包的实际长度 去除包头长度
        QByteArray baPacket = ""; ///< 原始字段
    };

public:
    explicit CClientSocket(int _iFd, QObject *parent = nullptr);
    virtual ~CClientSocket() override;
    void InitSocket(QThread * _pThread);
    void SendMsg(const QByteArray &_baMessage);

protected:
    virtual void timerEvent(QTimerEvent *event) override;

private:
    void _DeInitSocket();
    void _UpdateHeartTime();

private:
    Q_INVOKABLE void _StartCheckTimer();
    Q_INVOKABLE void _SendMessage(const QByteArray &_baMessage);

private slots:
    void SLOT_ReadyRead();
    void SLOT_SocketError(QAbstractSocket::SocketError _eError);
    void SLOT_Disconnect();

signals:
    void SIGNAL_Disconneted();

private:
    QString m_sCabinetCode;     ///< 柜体编号
    qint64 m_iOldResponseTimeStamp;     ///< 上一次响应的时间戳
    int m_iTimeId;  ///< 心跳包检测时间
    TMsgCache m_tMsgCache;  ///< 消息缓存结构体
};
// clientsoket.cpp
#define READ_MAX_SIZE 1024

CClientSocket::CClientSocket(int _iFd, QObject *parent) : QTcpSocket(parent)
{
    this->setSocketDescriptor(_iFd);
    connect(this, SIGNAL(error(QAbstractSocket::SocketError)), this,
            SLOT(SLOT_SocketError(QAbstractSocket::SocketError)));
    connect(this, &QTcpSocket::readyRead, this, &CClientSocket::SLOT_ReadyRead);
    connect(this, &QTcpSocket::disconnected, this, &CClientSocket::SLOT_Disconnect);
    m_iOldResponseTimeStamp = QDateTime::currentDateTime().toSecsSinceEpoch();
}

CClientSocket::~CClientSocket()
{
    this->close();
    qDebug() << "###################CClientSocket destruct";
}

void CClientSocket::InitSocket(QThread *_pThread)
{
    this->moveToThread(_pThread);
    QMetaObject::invokeMethod(this, &CClientSocket::_StartCheckTimer);
}

void CClientSocket::SendMsg(const QByteArray &_baMessage)
{
    QMetaObject::invokeMethod(this, "_SendMessage", Q_ARG(const QByteArray&, _baMessage));
}

void CClientSocket::timerEvent(QTimerEvent *event)
{
    if (m_iTimeId == event->timerId())
    {
       if (abs(QDateTime::currentSecsSinceEpoch() - m_iOldResponseTimeStamp) > 60)
       {
            this->disconnectFromHost();
       }
    }
}

void CClientSocket::_DeInitSocket()
{
    this->close();
    QObject::killTimer(m_iTimeId);
    m_iTimeId = 0;
}

void CClientSocket::_UpdateHeartTime()
{
    m_iOldResponseTimeStamp = QDateTime::currentDateTime().toSecsSinceEpoch();
}

void CClientSocket::_StartCheckTimer()
{
    qDebug() << "##########################_StartCheckTimer";
    m_iTimeId = QObject::startTimer(1000 * 5);
}

void CClientSocket::_SendMessage(const QByteArray &_baMessage)
{
    if (this->isWritable())
    {
        this->write(_baMessage);
        this->flush();
    }
}

void CClientSocket::SLOT_ReadyRead()
{
    char cBuffer[READ_MAX_SIZE];
    qint64 iReadSize = 0;
    QByteArray baNewCache;
    do{
        iReadSize = this->read(cBuffer, READ_MAX_SIZE);
        if (iReadSize == -1) ///< 网络异常
        {
            qDebug() << QString("############################Read Socket Error, %1:%2").arg(this->peerAddress().toString())
                        .arg(this->peerPort());
            this->SLOT_Disconnect();
            return;
        }

        if (iReadSize != 0)
        {
            baNewCache.append(cBuffer, static_cast(iReadSize));
        }
    }while (iReadSize != 0);

    if (m_tMsgCache.baPacket.size() != 0)
    {
        baNewCache = m_tMsgCache.baPacket + baNewCache;
    }

    while (baNewCache.size() > 0)
    {
        if (baNewCache.size() > 4)
        {
            uint32_t iSize;
            if (m_tMsgCache.iSize == 0)
            {
                QByteArray baSize = baNewCache.mid(0, 4);
                ::memcpy(&iSize, baSize.data(), sizeof(iSize));
                iSize = ::ntohl(iSize);
            }
            else{
                iSize = m_tMsgCache.iSize;
            }

            if (baNewCache.size() >= static_cast(iSize)) // 分解出一个完整的消息包
            {
                m_tMsgCache.baPacket = baNewCache.mid(4, static_cast(iSize - 4));
//                // 动作:推入到执行线程队列
//                m_pHandleMessageThd->Push(m_tMsgCache.baPacket );

                // 重置缓存状态
                m_tMsgCache.Clear();
                // 检测下一个新的消息包

                baNewCache = baNewCache.mid(static_cast(iSize));
            }
            else {
                m_tMsgCache.iSize = iSize;
                m_tMsgCache.baPacket = baNewCache;
                break;
            }
        }
        else{
            // 没有完整4字节长度值
            m_tMsgCache.iSize = 0;
            m_tMsgCache.baPacket = baNewCache;
            break;
        }
    }
    this->_UpdateHeartTime();
}

void CClientSocket::SLOT_SocketError(QAbstractSocket::SocketError _eError)
{
   qDebug() << QString("CClientSocket(%1:%2)disconnet, error:%3").arg(this->peerAddress().toString())
            .arg(this->peerPort()).arg(_eError);
   _DeInitSocket();
   emit SIGNAL_Disconneted();
}

void CClientSocket::SLOT_Disconnect()
{
    qDebug() << QString("client disconnect");
    _DeInitSocket();
    emit SIGNAL_Disconneted();
}

感谢各位的阅读!关于“Qt服务端多线程的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!


文章名称:Qt服务端多线程的示例分析
分享URL:http://gzruizhi.cn/article/pcepij.html

其他资讯